// Copyright 2022 @qoobing.com. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. // // Unique id creator tools, It can be used to create 'userid' or // 'transaction id' or something else need global unique id in a // service cluster. package uid import ( "errors" "fmt" "sync" "sync/atomic" "time" "github.com/gammazero/deque" "qoobing.com/gomod/log" ) // IdCreator type IdCreator interface { GetId(parts ...interface{}) string } // IdCreatorHelper type IdCreatorHelper interface { // CreatePrefix create id prefix CreatePrefix(parts ...interface{}) string // CreateRange create id range from database table t_id. CreateRange(prefix string) (start int64, length int, err error) // CreateMixedId create finnal transaction id CreateMixedId(prefix string, id int64, parts ...interface{}) string } // Range Range define the range [start, end] of ids. type Range struct { Next int64 // next id Status int // 0:invlid; 1:ok; 2:useout Prefix string // prefix Start int64 // begin id of this range(include) End int64 // end id of this range(include) Length int // lenght of this range } type rangeQueue struct { deque.Deque[*Range] } const ( RANGE_STATUS_OK = 1 RANGE_STATUS_USEOUT = 2 RANGE_STATUS_REMOVED = 3 ) // idCreator type idCreator struct { lock sync.Mutex Type string Ranges map[string]*rangeQueue RangesLocks map[string]*sync.Mutex Helper IdCreatorHelper } // NewIdCreator to new a id creator by helper. func NewIdCreator(typ string, helper IdCreatorHelper) IdCreator { // Step 1. basic value var idg = &idCreator{ lock: sync.Mutex{}, Type: typ, Ranges: map[string]*rangeQueue{}, RangesLocks: map[string]*sync.Mutex{}, Helper: helper, } // Step 2. generate the first range. if prefix, err := idg.createDefaultPrefix(); err == nil { idg.createNewRangeWithErrorPanic(prefix) } // Step 3. backgroud task for generate range go func() { for { log.Infof("start backgroud range generater, type=%s", typ) idg.backgroudTaskForGenerateRange() log.Infof("unexpect backgroud range generater exit") time.Sleep(1000 * time.Millisecond) } }() return idg } func (idg *idCreator) backgroudTaskForGenerateRange() { // rocover panic defer func() { if r := recover(); r != nil { log.Warningf("backgroudTaskForGenerateRange panic: [%v]", r) } }() // doing range generator for i := 0; ; i++ { log.SetLogid(fmt.Sprintf("uidcreator%07d", i)) log.PrintPretty("backgroud info, ranges:", idg.Ranges) for prefix, queue := range idg.Ranges { var qlen = queue.Len() var timeoutMs = 1 * time.Millisecond if qlen <= 0 { timeoutMs = 1 * time.Millisecond } else if qlen <= 2 { timeoutMs = 50 * time.Millisecond } else { timeoutMs = -1000 * time.Millisecond } if int(timeoutMs) > 0 { idg.createNewRangeWithErrorPanic(prefix) time.Sleep(timeoutMs) } else if int(timeoutMs) < 0 { time.Sleep(-timeoutMs) } } time.Sleep(3000 * time.Millisecond) } } func (idg *idCreator) GetId(idparts ...interface{}) string { var retry int var prefix = idg.Helper.CreatePrefix(idparts...) for retry = 0; retry < 50; retry++ { var id int64 = 0 var currange = idg.getCurrentRange(prefix) log.Infof("success get current range:(%s, %d-%d)", prefix, currange.Start, currange.End) // Step 1. Optimistic[no lock] to add id in current range. for { id = atomic.LoadInt64(&currange.Next) if id > currange.End { //next range currange.Status = RANGE_STATUS_USEOUT break } if atomic.CompareAndSwapInt64(&currange.Next, id, id+1) { //success break } } // Step 2. Presious [no lock] adding failed, try add id in next range. if id > currange.End { //next range log.Infof("failed get id from current range, try next") idg.turnToNextRange(currange) continue } // Step 3. Success, return mixed id. log.Infof("success get id(not miexed):(%s, %d)", prefix, id) return idg.Helper.CreateMixedId(prefix, id, idparts...) } log.Errorf("getid from range retry too much time(%d)", retry) panic("getid from range retry too much time") } func (idg *idCreator) createDefaultPrefix() (prefix string, err error) { defer func() { if r := recover(); r != nil { err = fmt.Errorf("default prefix not surport by user") } }() if prefix = idg.Helper.CreatePrefix(); len(prefix) == 0 { err = fmt.Errorf("default prefix not surport by user") } return prefix, err } func (idg *idCreator) createNewRangeWithErrorPanic(prefix string) { if err := idg.createNewRange(prefix); err != nil { panic("create new range error:" + err.Error()) } } func (idg *idCreator) removeOldRangeWithErrorPanic(prefix string) { if err := idg.removeOldRange(prefix); err != nil { panic("remove old range error:" + err.Error()) } } func (idg *idCreator) createNewRange(prefix string) error { // Step 1. lock generator. idg.lock.Lock() defer idg.lock.Unlock() if _, ok := idg.RangesLocks[prefix]; !ok { idg.Ranges[prefix] = &rangeQueue{Deque: *deque.New[*Range]()} idg.RangesLocks[prefix] = &sync.Mutex{} } var queue = idg.Ranges[prefix] var lock = idg.RangesLocks[prefix] lock.Lock() defer lock.Unlock() var queueLen = queue.Len() if queueLen >= 30 { log.Infof("ignore createNewRange beacuse of queue is long enough(%d)", queueLen) return nil } // Step 2. create range from db or something else. start, length, err := idg.Helper.CreateRange(prefix) if err != nil { str := fmt.Sprintf( "idCreator '%s' generate range err:'%s'", idg.Type, err.Error()) log.Errorf("%s", str) return errors.New(str) } // Step 3. push to ranges queue. var r = &Range{ Next: start, Status: RANGE_STATUS_OK, Prefix: prefix, Start: start, End: start + int64(length) - 1, Length: length, } queue.PushBack(r) log.Infof("success createNewRange (%s, %d-%d)", prefix, start, r.End) return nil } func (idg *idCreator) removeOldRange(prefix string) error { if lock, ok := idg.RangesLocks[prefix]; ok { lock.Lock() defer lock.Unlock() queue := idg.Ranges[prefix] for queue.Len() > 0 { r := queue.Front() if r.Status == RANGE_STATUS_OK { log.Infof("success remove all useout ranges for prefix:%s", prefix) return nil } else if r.Status == RANGE_STATUS_USEOUT { if rr := queue.PopFront(); rr != r { panic("queue head is changed, maybe somethong bug") } log.Infof("success remove old range:(%s,%d-%d), try remove next", prefix, r.Start, r.End) } else { panic("invalid status, maybe somethong bug") } } } else { log.Infof("rare case: rangequeue:(%s) have removed by others", prefix) } return nil } func (idg *idCreator) getCurrentRange(prefix string) (r *Range) { var ( ok = false lock *sync.Mutex = nil queue *rangeQueue = nil ) // Step 0. get ranges queue by prefix. for trycount := 0; !ok && trycount < 3; trycount++ { if queue, ok = idg.Ranges[prefix]; ok { lock = idg.RangesLocks[prefix] break } // try create new range idg.createNewRange(prefix) } if !ok { panic("maybe something bug or too many pressures," + "cannot found range queue after createNewRange retry 3 times") } // Step 1. try read from queue directly. lock.Lock() if queue.Len() >= 1 { r = queue.Front() lock.Unlock() return r } lock.Unlock() // Step 2. retry create a new range & try again. idg.createNewRange(prefix) lock.Lock() defer lock.Unlock() if queue.Len() >= 1 { r = queue.Front() return r } panic("get current range failed, too many pressure???") } func (idg *idCreator) turnToNextRange(r *Range) { // We donot lock the range, because there is no too much // side effect even if 'createNewRange' execute twice. switch r.Status { case RANGE_STATUS_USEOUT: idg.createNewRangeWithErrorPanic(r.Prefix) idg.removeOldRangeWithErrorPanic(r.Prefix) case RANGE_STATUS_OK: // have turn by others, we do nothing. log.Debugf("have turn by others, we do nothing.") default: panic("invalid status, maybe somethong bug") } }