284 lines
		
	
	
		
			7.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			284 lines
		
	
	
		
			7.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2022 @qoobing.com. All rights reserved.
 | |
| // Use of this source code is governed by a BSD-style
 | |
| // license that can be found in the LICENSE file.
 | |
| //
 | |
| // Unique id creator tools, It can be used to create 'userid' or
 | |
| // 'transaction id' or something else need global unique id in a
 | |
| // service cluster.
 | |
| package uid
 | |
| 
 | |
| import (
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/gammazero/deque"
 | |
| 	"qoobing.com/gomod/log"
 | |
| )
 | |
| 
 | |
| // IdCreator
 | |
| type IdCreator interface {
 | |
| 	GetId(parts ...interface{}) string
 | |
| }
 | |
| 
 | |
| // IdCreatorHelper
 | |
| type IdCreatorHelper interface {
 | |
| 	// CreatePrefix create id prefix
 | |
| 	CreatePrefix(parts ...interface{}) string
 | |
| 	// CreateRange create id range from database table t_id.
 | |
| 	CreateRange(prefix string) (start int64, length int, err error)
 | |
| 	// CreateMixedId create finnal transaction id
 | |
| 	CreateMixedId(prefix string, id int64, parts ...interface{}) string
 | |
| }
 | |
| 
 | |
| // Range Range define the range [start, end] of ids.
 | |
| type Range struct {
 | |
| 	Next   int64  // next id
 | |
| 	Status int    // 0:invlid; 1:ok; 2:useout
 | |
| 	Prefix string // prefix
 | |
| 	Start  int64  // begin id of this range(include)
 | |
| 	End    int64  // end id of this range(include)
 | |
| 	Length int    // lenght of this range
 | |
| }
 | |
| type rangeQueue struct {
 | |
| 	//deque.Deque[*Range]
 | |
| }
 | |
| 
 | |
| const (
 | |
| 	RANGE_STATUS_OK     = 1
 | |
| 	RANGE_STATUS_USEOUT = 2
 | |
| )
 | |
| 
 | |
| // idCreator
 | |
| type idCreator struct {
 | |
| 	lock        sync.Mutex
 | |
| 	Type        string
 | |
| 	Ranges      map[string]*rangeQueue
 | |
| 	RangesLocks map[string]*sync.Mutex
 | |
| 	Helper      IdCreatorHelper
 | |
| }
 | |
| 
 | |
| // NewIdCreator to new a id creator by helper.
 | |
| func NewIdCreator(typ string, helper IdCreatorHelper) IdCreator {
 | |
| 	// Step 1. basic value
 | |
| 	var idg = &idCreator{
 | |
| 		lock:        sync.Mutex{},
 | |
| 		Type:        typ,
 | |
| 		Ranges:      map[string]*rangeQueue{},
 | |
| 		RangesLocks: map[string]*sync.Mutex{},
 | |
| 		Helper:      helper,
 | |
| 	}
 | |
| 
 | |
| 	// Step 2. generate the first range.
 | |
| 	if prefix, err := idg.createDefaultPrefix(); err == nil {
 | |
| 		idg.createNewRangeWithErrorPanic(prefix)
 | |
| 	}
 | |
| 
 | |
| 	// Step 3. backgroud task for generate range
 | |
| 	go func() {
 | |
| 		for i := 0; ; i++ {
 | |
| 			log.SetLogid(fmt.Sprintf("uidcreator%07d", i))
 | |
| 			for prefix, queue := range idg.Ranges {
 | |
| 				var qlen = queue.Len()
 | |
| 				var timeoutMs = 1 * time.Millisecond
 | |
| 
 | |
| 				if qlen <= 0 {
 | |
| 					timeoutMs = 1 * time.Millisecond
 | |
| 				} else if qlen <= 2 {
 | |
| 					timeoutMs = 50 * time.Millisecond
 | |
| 				} else {
 | |
| 					timeoutMs = -1000 * time.Millisecond
 | |
| 				}
 | |
| 
 | |
| 				if int(timeoutMs) > 0 {
 | |
| 					idg.createNewRangeWithErrorPanic(prefix)
 | |
| 					time.Sleep(timeoutMs * time.Millisecond)
 | |
| 				} else if int(timeoutMs) < 0 {
 | |
| 					time.Sleep((-timeoutMs) * time.Millisecond)
 | |
| 				}
 | |
| 			}
 | |
| 			time.Sleep(3000 * time.Millisecond)
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	return idg
 | |
| }
 | |
| 
 | |
| func (idg *idCreator) GetId(idparts ...interface{}) string {
 | |
| 	var prefix = idg.Helper.CreatePrefix(idparts...)
 | |
| 	for {
 | |
| 		var id int64 = 0
 | |
| 		var currange = idg.getCurrentRange(prefix)
 | |
| 		log.Infof("success get current range:(%s, %d-%d)",
 | |
| 			prefix, currange.Start, currange.End)
 | |
| 		// Step 1. Optimistic[no lock] to add id in current range.
 | |
| 		for {
 | |
| 			id = atomic.LoadInt64(&currange.Next)
 | |
| 			if id > currange.End {
 | |
| 				//next range
 | |
| 				break
 | |
| 			}
 | |
| 			if atomic.CompareAndSwapInt64(&currange.Next, id, id+1) {
 | |
| 				//success
 | |
| 				break
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		// Step 2. No lock add failed, try add id in next range.
 | |
| 		if id > currange.End {
 | |
| 			//next range
 | |
| 			log.Infof("failed get id from current range, try next")
 | |
| 			idg.turnToNextRange(currange)
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		// Step 3. Success, return mixed id.
 | |
| 		log.Infof("success get id(not miexed):(%s, %d)", prefix, id)
 | |
| 		return idg.Helper.CreateMixedId(prefix, id, idparts...)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (idg *idCreator) createDefaultPrefix() (prefix string, err error) {
 | |
| 	defer func() {
 | |
| 		if r := recover(); r != nil {
 | |
| 			err = fmt.Errorf("default prefix not surport by user")
 | |
| 		}
 | |
| 	}()
 | |
| 	if prefix = idg.Helper.CreatePrefix(); len(prefix) == 0 {
 | |
| 		err = fmt.Errorf("default prefix not surport by user")
 | |
| 	}
 | |
| 	return prefix, err
 | |
| }
 | |
| 
 | |
| func (idg *idCreator) createNewRangeWithErrorPanic(prefix string) {
 | |
| 	if err := idg.createNewRange(prefix); err != nil {
 | |
| 		panic("create new range error:" + err.Error())
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (idg *idCreator) removeOldRangeWithErrorPanic(prefix string) {
 | |
| 	if err := idg.removeOldRange(prefix); err != nil {
 | |
| 		panic("remove old range error:" + err.Error())
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (idg *idCreator) createNewRange(prefix string) error {
 | |
| 	// Step 1. create range from db or something else.
 | |
| 	start, length, err := idg.Helper.CreateRange(prefix)
 | |
| 	if err != nil {
 | |
| 		str := fmt.Sprintf(
 | |
| 			"idCreator '%s' generate range err:'%s'",
 | |
| 			idg.Type, err.Error())
 | |
| 		log.Errorf("%s", str)
 | |
| 		return errors.New(str)
 | |
| 	}
 | |
| 
 | |
| 	// Step 2. lock generator.
 | |
| 	idg.lock.Lock()
 | |
| 	defer idg.lock.Unlock()
 | |
| 	if _, ok := idg.RangesLocks[prefix]; !ok {
 | |
| 		idg.Ranges[prefix] = &rangeQueue{Deque: *deque.New[*Range]()}
 | |
| 		idg.RangesLocks[prefix] = &sync.Mutex{}
 | |
| 	}
 | |
| 	var queue = idg.Ranges[prefix]
 | |
| 	var lock = idg.RangesLocks[prefix]
 | |
| 	lock.Lock()
 | |
| 	defer lock.Unlock()
 | |
| 
 | |
| 	// Step 3. push to ranges queue.
 | |
| 	var r = &Range{
 | |
| 		Next:   start,
 | |
| 		Status: RANGE_STATUS_OK,
 | |
| 		Prefix: prefix,
 | |
| 		Start:  start,
 | |
| 		End:    start + int64(length) - 1,
 | |
| 		Length: length,
 | |
| 	}
 | |
| 	queue.PushBack(r)
 | |
| 	log.Infof("success createNewRange (%s, %d-%d)", prefix, start, r.End)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (idg *idCreator) removeOldRange(prefix string) error {
 | |
| 	if lock, ok := idg.RangesLocks[prefix]; ok {
 | |
| 		lock.Lock()
 | |
| 		defer lock.Unlock()
 | |
| 
 | |
| 		queue := idg.Ranges[prefix]
 | |
| 		for queue.Len() > 0 {
 | |
| 			r := queue.Front()
 | |
| 			if r.Status == RANGE_STATUS_OK {
 | |
| 				break
 | |
| 			} else if r.Status == RANGE_STATUS_USEOUT {
 | |
| 				queue.PopFront()
 | |
| 				log.Infof("success remove old range:(%s,%d-%d)",
 | |
| 					prefix, r.Start, r.End)
 | |
| 			} else {
 | |
| 				panic("invalid status, maybe somethong bug")
 | |
| 			}
 | |
| 		}
 | |
| 	} else {
 | |
| 		log.Infof("rare case: rangequeue:(%s) have remove by others", prefix)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (idg *idCreator) getCurrentRange(prefix string) (r *Range) {
 | |
| 	var (
 | |
| 		ok                = false
 | |
| 		lock  *sync.Mutex = nil
 | |
| 		queue *rangeQueue = nil
 | |
| 	)
 | |
| 
 | |
| 	// Step 0. get ranges queue by prefix.
 | |
| 	for trycount := 0; !ok && trycount < 3; trycount++ {
 | |
| 		if queue, ok = idg.Ranges[prefix]; ok {
 | |
| 			lock = idg.RangesLocks[prefix]
 | |
| 			break
 | |
| 		}
 | |
| 		// try create new range
 | |
| 		idg.createNewRange(prefix)
 | |
| 	}
 | |
| 	if !ok {
 | |
| 		panic("maybe something bug or too many pressures," +
 | |
| 			"cannot found range queue after createNewRange retry 3 times")
 | |
| 	}
 | |
| 
 | |
| 	// Step 1. try read from queue directly.
 | |
| 	lock.Lock()
 | |
| 	if queue.Len() >= 1 {
 | |
| 		r = queue.Front()
 | |
| 		lock.Unlock()
 | |
| 		return r
 | |
| 	}
 | |
| 	lock.Unlock()
 | |
| 
 | |
| 	// Step 2. retry create a new range & try again.
 | |
| 	idg.createNewRange(prefix)
 | |
| 	lock.Lock()
 | |
| 	defer lock.Unlock()
 | |
| 	if queue.Len() >= 1 {
 | |
| 		r = queue.Front()
 | |
| 		return r
 | |
| 	}
 | |
| 	panic("get current range failed, too many pressure???")
 | |
| }
 | |
| 
 | |
| func (idg *idCreator) turnToNextRange(r *Range) {
 | |
| 	// We donot lock the range, because there is no too much
 | |
| 	// side effect even if 'createNewRange' execute twice.
 | |
| 	switch r.Status {
 | |
| 	case RANGE_STATUS_OK:
 | |
| 		r.Status = RANGE_STATUS_USEOUT
 | |
| 		idg.createNewRangeWithErrorPanic(r.Prefix)
 | |
| 		idg.removeOldRangeWithErrorPanic(r.Prefix)
 | |
| 	case RANGE_STATUS_USEOUT:
 | |
| 		// have turn by others, we do nothing.
 | |
| 		log.Debugf("have turn by others, we do nothing.")
 | |
| 	default:
 | |
| 		panic("invalid status, maybe somethong bug")
 | |
| 	}
 | |
| }
 | 
