uid/uid.go
2023-04-20 16:40:02 +08:00

320 lines
8.1 KiB
Go

// Copyright 2022 @qoobing.com. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//
// Unique id creator tools, It can be used to create 'userid' or
// 'transaction id' or something else need global unique id in a
// service cluster.
package uid
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/gammazero/deque"
"qoobing.com/gomod/log"
)
// IdCreator
type IdCreator interface {
GetId(parts ...interface{}) string
}
// IdCreatorHelper
type IdCreatorHelper interface {
// CreatePrefix create id prefix
CreatePrefix(parts ...interface{}) string
// CreateRange create id range from database table t_id.
CreateRange(prefix string) (start int64, length int, err error)
// CreateMixedId create finnal transaction id
CreateMixedId(prefix string, id int64, parts ...interface{}) string
}
// Range Range define the range [start, end] of ids.
type Range struct {
Next int64 // next id
Status int // 0:invlid; 1:ok; 2:useout
Prefix string // prefix
Start int64 // begin id of this range(include)
End int64 // end id of this range(include)
Length int // lenght of this range
}
type rangeQueue struct {
deque.Deque[*Range]
}
const (
RANGE_STATUS_OK = 1
RANGE_STATUS_USEOUT = 2
RANGE_STATUS_REMOVED = 3
)
// idCreator
type idCreator struct {
lock sync.Mutex
Type string
Ranges map[string]*rangeQueue
RangesLocks map[string]*sync.Mutex
Helper IdCreatorHelper
}
// NewIdCreator to new a id creator by helper.
func NewIdCreator(typ string, helper IdCreatorHelper) IdCreator {
// Step 1. basic value
var idg = &idCreator{
lock: sync.Mutex{},
Type: typ,
Ranges: map[string]*rangeQueue{},
RangesLocks: map[string]*sync.Mutex{},
Helper: helper,
}
// Step 2. generate the first range.
if prefix, err := idg.createDefaultPrefix(); err == nil {
idg.createNewRangeWithErrorPanic(prefix)
}
// Step 3. backgroud task for generate range
go func() {
for {
log.Infof("start backgroud range generater, type=%s", typ)
idg.backgroudTaskForGenerateRange()
log.Infof("unexpect backgroud range generater exit")
time.Sleep(1000 * time.Millisecond)
}
}()
return idg
}
func (idg *idCreator) backgroudTaskForGenerateRange() {
// rocover panic
defer func() {
if r := recover(); r != nil {
log.Warningf("backgroudTaskForGenerateRange panic: [%v]", r)
}
}()
// doing range generator
for i := 0; ; i++ {
logStr := ""
log.SetLogid(fmt.Sprintf("uidcreator%07d", i))
for prefix, queue := range idg.Ranges {
var qlen = queue.Len()
var timeoutMs = 1 * time.Millisecond
if qlen <= 0 {
timeoutMs = 1 * time.Millisecond
} else if qlen <= 2 {
timeoutMs = 50 * time.Millisecond
} else {
timeoutMs = -1000 * time.Millisecond
}
if int(timeoutMs) > 0 {
idg.createNewRangeWithErrorPanic(prefix)
time.Sleep(timeoutMs)
} else if int(timeoutMs) < 0 {
time.Sleep(-timeoutMs)
}
if i%50 == 0 {
logStr += fmt.Sprintf("queue-legth:(%s,%d);", prefix, queue.Len())
}
}
if i%50 == 0 {
log.Infof("backgroud idcreator info: %s", logStr)
}
time.Sleep(3000 * time.Millisecond)
}
}
func (idg *idCreator) GetId(idparts ...interface{}) string {
var retry int
var prefix = idg.Helper.CreatePrefix(idparts...)
for retry = 0; retry < 50; retry++ {
var id int64 = 0
var currange = idg.getCurrentRange(prefix)
log.Infof("success get current range:(%s, %d-%d)",
prefix, currange.Start, currange.End)
// Step 1. Optimistic[no lock] to add id in current range.
for {
id = atomic.LoadInt64(&currange.Next)
if id > currange.End {
//next range
currange.Status = RANGE_STATUS_USEOUT
break
}
if atomic.CompareAndSwapInt64(&currange.Next, id, id+1) {
//success
break
}
}
// Step 2. Presious [no lock] adding failed, try add id in next range.
if id > currange.End {
//next range
log.Infof("failed get id from current range, try next")
idg.turnToNextRange(currange)
continue
}
// Step 3. Success, return mixed id.
log.Infof("success get id(not miexed):(%s, %d)", prefix, id)
return idg.Helper.CreateMixedId(prefix, id, idparts...)
}
log.Errorf("getid from range retry too much time(%d)", retry)
panic("getid from range retry too much time")
}
func (idg *idCreator) createDefaultPrefix() (prefix string, err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("default prefix not surport by user")
}
}()
if prefix = idg.Helper.CreatePrefix(); len(prefix) == 0 {
err = fmt.Errorf("default prefix not surport by user")
}
return prefix, err
}
func (idg *idCreator) createNewRangeWithErrorPanic(prefix string) {
if err := idg.createNewRange(prefix); err != nil {
panic("create new range error:" + err.Error())
}
}
func (idg *idCreator) removeOldRangeWithErrorPanic(prefix string) {
if err := idg.removeOldRange(prefix); err != nil {
panic("remove old range error:" + err.Error())
}
}
func (idg *idCreator) createNewRange(prefix string) error {
// Step 1. lock generator.
idg.lock.Lock()
defer idg.lock.Unlock()
if _, ok := idg.RangesLocks[prefix]; !ok {
idg.Ranges[prefix] = &rangeQueue{Deque: *deque.New[*Range]()}
idg.RangesLocks[prefix] = &sync.Mutex{}
}
var queue = idg.Ranges[prefix]
var lock = idg.RangesLocks[prefix]
lock.Lock()
defer lock.Unlock()
var queueLen = queue.Len()
if queueLen >= 30 {
log.Infof("ignore createNewRange beacuse of queue is long enough(%d)", queueLen)
return nil
}
// Step 2. create range from db or something else.
start, length, err := idg.Helper.CreateRange(prefix)
if err != nil {
str := fmt.Sprintf(
"idCreator '%s' generate range err:'%s'",
idg.Type, err.Error())
log.Errorf("%s", str)
return errors.New(str)
}
// Step 3. push to ranges queue.
var r = &Range{
Next: start,
Status: RANGE_STATUS_OK,
Prefix: prefix,
Start: start,
End: start + int64(length) - 1,
Length: length,
}
queue.PushBack(r)
log.Infof("success createNewRange (%s, %d-%d)", prefix, start, r.End)
return nil
}
func (idg *idCreator) removeOldRange(prefix string) error {
if lock, ok := idg.RangesLocks[prefix]; ok {
lock.Lock()
defer lock.Unlock()
queue := idg.Ranges[prefix]
for queue.Len() > 0 {
r := queue.Front()
if r.Status == RANGE_STATUS_OK {
log.Infof("success remove all useout ranges for prefix:%s", prefix)
return nil
} else if r.Status == RANGE_STATUS_USEOUT {
if rr := queue.PopFront(); rr != r {
panic("queue head is changed, maybe somethong bug")
}
log.Infof("success remove old range:(%s,%d-%d), try remove next",
prefix, r.Start, r.End)
} else {
panic("invalid status, maybe somethong bug")
}
}
} else {
log.Infof("rare case: rangequeue:(%s) have removed by others", prefix)
}
return nil
}
func (idg *idCreator) getCurrentRange(prefix string) (r *Range) {
var (
ok = false
lock *sync.Mutex = nil
queue *rangeQueue = nil
)
// Step 0. get ranges queue by prefix.
for trycount := 0; !ok && trycount < 3; trycount++ {
if queue, ok = idg.Ranges[prefix]; ok {
lock = idg.RangesLocks[prefix]
break
}
// try create new range
idg.createNewRange(prefix)
}
if !ok {
panic("maybe something bug or too many pressures," +
"cannot found range queue after createNewRange retry 3 times")
}
// Step 1. try read from queue directly.
lock.Lock()
if queue.Len() >= 1 {
r = queue.Front()
lock.Unlock()
return r
}
lock.Unlock()
// Step 2. retry create a new range & try again.
idg.createNewRange(prefix)
lock.Lock()
defer lock.Unlock()
if queue.Len() >= 1 {
r = queue.Front()
return r
}
panic("get current range failed, too many pressure???")
}
func (idg *idCreator) turnToNextRange(r *Range) {
// We donot lock the range, because there is no too much
// side effect even if 'createNewRange' execute twice.
switch r.Status {
case RANGE_STATUS_USEOUT:
idg.createNewRangeWithErrorPanic(r.Prefix)
idg.removeOldRangeWithErrorPanic(r.Prefix)
case RANGE_STATUS_OK:
// have turn by others, we do nothing.
log.Debugf("have turn by others, we do nothing.")
default:
panic("invalid status, maybe somethong bug")
}
}