Compare commits

...

9 Commits

Author Message Date
bryanqiu
684d82f7bd change log 2023-04-20 16:40:02 +08:00
bryanqiu
cc94c074cc add test & fix cocurrent bug 2023-04-20 16:16:25 +08:00
bryanqiu
229bfb80cd add logid 2022-12-05 10:50:58 +08:00
bryanqiu
ac4c30b171 add logid 2022-12-05 10:49:36 +08:00
bryanqiu
f6c68dec7c v1.0.5 2022-12-03 19:56:56 +08:00
bryanqiu
c1b56c8472 fix bug 2022-12-02 17:55:47 +08:00
bryanqiu
79e49306ef rename TXID to SIGNID 2022-12-02 14:16:36 +08:00
bryanqiu
d13a1e4292 add model t_uid 2022-12-02 11:49:26 +08:00
bryanqiu
6f7e10aecb v1.0.1 2022-11-23 16:02:30 +08:00
6 changed files with 464 additions and 117 deletions

14
go.mod
View File

@ -2,4 +2,16 @@ module qoobing.com/gomod/uid
go 1.19 go 1.19
require github.com/gammazero/deque v0.2.1 require (
github.com/gammazero/deque v0.2.1
gorm.io/gorm v1.25.0
qoobing.com/gomod/log v1.2.2
)
require (
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/tylerb/gls v0.0.0-20150407001822-e606233f194d // indirect
github.com/tylerb/is v2.1.4+incompatible // indirect
qoobing.com/gomod/str v1.0.1 // indirect
)

16
go.sum Normal file
View File

@ -0,0 +1,16 @@
github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0=
github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/tylerb/gls v0.0.0-20150407001822-e606233f194d h1:yYYPFFlbqxF5mrj5sEfETtM/Ssz2LTy0/VKlDdXYctc=
github.com/tylerb/gls v0.0.0-20150407001822-e606233f194d/go.mod h1:0MwyId/pXK5wkYYEXe7NnVknX+aNBuF73fLV3U0reU8=
github.com/tylerb/is v2.1.4+incompatible h1:BMf2zP0kY2Ykzx2W1fDrjwKj1x1B4E0mELkpjaNy1tM=
github.com/tylerb/is v2.1.4+incompatible/go.mod h1:3Bw2NWEEe8Kx7/etYqgm9ug53iNDgabnloch75jjOSc=
gorm.io/gorm v1.25.0 h1:+KtYtb2roDz14EQe4bla8CbQlmb9dN3VejSai3lprfU=
gorm.io/gorm v1.25.0/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k=
qoobing.com/gomod/log v1.2.2 h1:6h7cgfhIm4cQNASC7KDJ8TyLn3pb7eYJV9JO+vXYwDA=
qoobing.com/gomod/log v1.2.2/go.mod h1:/ZTN/ukAbSqRb4eMlF9LpfkVgM21xwprbd5y3tcQxpM=
qoobing.com/gomod/str v1.0.1 h1:X+JOigE9xA6cTNph7/s1KeD4zLYM9XTLPPHQcpHFoog=
qoobing.com/gomod/str v1.0.1/go.mod h1:gbhN2dba/P5gFRGVJvEI57KEJLlMHHAd6Kuuxn4GlMY=

145
model/t_uid/t_uid.go Normal file
View File

@ -0,0 +1,145 @@
package t_uid
import (
"fmt"
"time"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"qoobing.com/gomod/log"
)
type t_uid struct {
F_id int64 `gorm:"column:F_id;primaryKey"` // ID范围id
F_pid int64 `gorm:"column:F_pid"` // ID范围的前一个范围id
F_type string `gorm:"column:F_type"` // ID类型
F_prefix string `gorm:"column:F_prefix"` // ID前缀(type+prefix+range_start唯一)
F_range_start int64 `gorm:"column:F_range_start"` // ID范围的起始id
F_range_length int `gorm:"column:F_range_length"` // ID范围的长度
F_range_owner string `gorm:"column:F_range_owner"` // ID范围的拥有者一般是机器id
F_create_time time.Time `gorm:"column:F_create_time"` // 记录创建时间
F_modify_time time.Time `gorm:"column:F_modify_time"` // 记录更新时间
}
type typeInfo struct {
Name string //type name
RangeLength int //Range length
}
var table string = "t_uid" // id range table name
var typeSupported = map[string]typeInfo{
"SIGNID": typeInfo{
Name: "SIGNID",
RangeLength: 3,
},
"USERID": typeInfo{
Name: "USERID",
RangeLength: 3,
},
}
func Init(tablename string) {
table = tablename
}
func (id *t_uid) TableName() string {
return table
}
func (id *t_uid) BeforeCreate(db *gorm.DB) error {
if id.F_create_time.IsZero() {
id.F_create_time = time.Now()
id.F_modify_time = time.Now()
}
return nil
}
func (id *t_uid) BeforeUpdate(db *gorm.DB) (err error) {
id.F_modify_time = time.Now()
return nil
}
func CreateIdRange(db *gorm.DB, typ, prefix string) (start int64, length int, err error) {
// Step 1. init variables
if t, ok := typeSupported[typ]; !ok {
panic(fmt.Sprintf("id type [%s] not supported yet", typ))
} else {
length = t.RangeLength
}
var (
lock = clause.Locking{Strength: "UPDATE"}
root = t_uid{}
rootcond = map[string]interface{}{
"F_type": fmt.Sprintf("%s#ROOT", typ),
"F_prefix": prefix,
"F_pid": 0,
}
rootattr = map[string]interface{}{
"F_range_start": -length,
"F_range_length": length,
"F_range_owner": "0",
}
selfid = "TODO"
)
log.Infof("start CreateIdRange...")
// Step 2. get lastid && lock root(type+prefix)
var tx = db.Begin()
var txsuccess = false
defer func() {
if !txsuccess {
tx.Rollback()
}
}()
txr := tx.Clauses(lock).
Where(rootcond).
Attrs(rootattr).
FirstOrCreate(&root)
if txr.Error != nil {
return 0, 0, txr.Error
}
if root.F_range_owner == "0" {
// for new created root we set owner to itself.
root.F_range_owner = fmt.Sprintf("%d", root.F_id)
}
log.PrintPretty("root:", root)
// Step 3. insert new range. [can DELETE this for perfermance]
var newrange = t_uid{
F_type: typ,
F_prefix: prefix,
F_range_start: root.F_range_start + int64(length),
F_range_length: length,
F_range_owner: selfid,
}
if n, e := fmt.Sscanf(root.F_range_owner, "%d", &newrange.F_pid); e != nil {
log.Debugf("sscanf F_range_owner(%s) failed: '%s'", root.F_range_owner, e)
return 0, 0, fmt.Errorf("range_owner error")
} else if n != 1 {
log.Debugf("sscanf F_range_owner(%s) failed: n=%d", root.F_range_owner, n)
return 0, 0, fmt.Errorf("range_owner error")
}
if txr := tx.Create(&newrange); txr.Error != nil {
log.PrintPretty("newrange:", newrange)
return 0, 0, txr.Error
}
// Step 4. update root
root.F_range_start = newrange.F_range_start
root.F_range_length = newrange.F_range_length
root.F_range_owner = fmt.Sprintf("%d", newrange.F_id)
if txr := tx.Save(&root); txr.Error != nil {
return 0, 0, txr.Error
}
// Step 5. commit and return
if txr := tx.Commit(); txr.Error != nil {
return 0, 0, txr.Error
}
txsuccess = true
start = newrange.F_range_start
length = newrange.F_range_length
log.Infof("create range success:(%s, %s, %d, %d)", typ, prefix, start, length)
return start, length, nil
}

0
test/logs/.gitkeep Normal file
View File

85
test/main.go Normal file
View File

@ -0,0 +1,85 @@
package main
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"qoobing.com/gomod/log"
"qoobing.com/gomod/uid"
)
type MemCreatorHelper struct {
start int64
length int
lock sync.Mutex
}
func NewMemCreatorHelper() *MemCreatorHelper {
return &MemCreatorHelper{
lock: sync.Mutex{},
start: 0,
length: 100,
}
}
// CreatePrefix create id prefix
func (mch *MemCreatorHelper) CreatePrefix(parts ...interface{}) string {
return "XXXPPPYYY"
}
// CreateRange create id range from database table t_id.
func (mch *MemCreatorHelper) CreateRange(prefix string) (start int64, length int, err error) {
mch.lock.Lock()
defer mch.lock.Unlock()
fmt.Println("CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCreateRange")
mch.start += int64(mch.length)
if mch.start == 300 {
panic("test panic: 3")
} else if mch.start == 7000 {
return mch.start, mch.length, errors.New("tesk error: 7")
}
return mch.start, mch.length, nil
}
// // CreateMixedId create finnal transaction id
func (mch *MemCreatorHelper) CreateMixedId(prefix string, id int64, parts ...interface{}) string {
return fmt.Sprintf("id-%s-%d", prefix, id)
}
var helper = NewMemCreatorHelper()
var idcreator = uid.NewIdCreator("test", helper)
func getId(i, ii int) {
defer func() {
if r := recover(); r != nil {
fmt.Println("panic recover:", r)
}
}()
fmt.Printf("%d-%d: %s\n", i, ii, idcreator.GetId())
}
func main() {
c := atomic.Uint64{}
st := time.Now()
wg := sync.WaitGroup{}
wg.Add(1000)
for i := 0; i < 1000; i++ {
go func(i int) {
log.SetLogid(fmt.Sprintf("main idcreator%07d", i))
for ii := 0; ii < 1000; ii++ {
getId(i, ii)
c.Add(uint64(1))
}
wg.Done()
}(i)
}
wg.Wait()
et := time.Now()
fmt.Println("cost:", et.Sub(st))
fmt.Println("helper:", helper)
fmt.Println("all done, total id number is:", c.Load())
}

321
uid.go
View File

@ -8,12 +8,14 @@
package uid package uid
import ( import (
"errors"
"fmt" "fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/gammazero/deque" "github.com/gammazero/deque"
"qoobing.com/gomod/log"
) )
// IdCreator // IdCreator
@ -24,60 +26,63 @@ type IdCreator interface {
// IdCreatorHelper // IdCreatorHelper
type IdCreatorHelper interface { type IdCreatorHelper interface {
// CreatePrefix create id prefix // CreatePrefix create id prefix
CreatePrefix(parts ...interface{}) CreatePrefix(parts ...interface{}) string
// CreateRange create id range from database table t_id. // CreateRange create id range from database table t_id.
CreateRange(prefix string) (start, length int64, err error) CreateRange(prefix string) (start int64, length int, err error)
// CreateMixedId create finnal transaction id // CreateMixedId create finnal transaction id
CreateMixedId(prefix string, id int64, parts ...interface{}) string CreateMixedId(prefix string, id int64, parts ...interface{}) string
} }
// idCreater // Range Range define the range [start, end] of ids.
type idCreater struct { type Range struct {
lock sync.Mutex Next int64 // next id
Type string Status int // 0:invlid; 1:ok; 2:useout
Ranges map[string]*deque.Deque Prefix string // prefix
Helper IdCreateHelper Start int64 // begin id of this range(include)
End int64 // end id of this range(include)
Length int // lenght of this range
}
type rangeQueue struct {
deque.Deque[*Range]
} }
// NewIdCreater to new a id creator by helper. const (
func NewIdCreater(typ string, helper IdCreatorHelper) IdCreater { RANGE_STATUS_OK = 1
RANGE_STATUS_USEOUT = 2
RANGE_STATUS_REMOVED = 3
)
// idCreator
type idCreator struct {
lock sync.Mutex
Type string
Ranges map[string]*rangeQueue
RangesLocks map[string]*sync.Mutex
Helper IdCreatorHelper
}
// NewIdCreator to new a id creator by helper.
func NewIdCreator(typ string, helper IdCreatorHelper) IdCreator {
// Step 1. basic value // Step 1. basic value
var idg = &idCreater{ var idg = &idCreator{
lock: sync.Mutex, lock: sync.Mutex{},
Type: typ, Type: typ,
Ranges: map[string]*deque.Deque{}, Ranges: map[string]*rangeQueue{},
Helper: helper, RangesLocks: map[string]*sync.Mutex{},
Helper: helper,
} }
// Step 2. generate the first range. // Step 2. generate the first range.
if prefix, err = idg.createDefaultPrefix(); err == nil { if prefix, err := idg.createDefaultPrefix(); err == nil {
if err := idg.createNewRange(prefix); err != nil { idg.createNewRangeWithErrorPanic(prefix)
log.Errorf("generate the first range failed: %s", err)
}
} }
// Step 3. backgroud task for generate range // Step 3. backgroud task for generate range
go func() { go func() {
for { for {
for prefix, queue := range idg.Ranges { log.Infof("start backgroud range generater, type=%s", typ)
var qlen = queue.Len() idg.backgroudTaskForGenerateRange()
var timeoutMs = 1 log.Infof("unexpect backgroud range generater exit")
if queueLen <= 0 {
timeoutMs = 1
} else if queueLen <= 2 {
timeoutMs = 50
} else {
timeoutMs = -1000
}
if timeoutMs > 0 {
idg.createNewRange(prefix)
time.Sleep(timeoutMs * time.Millisecond)
} else if timeoutMs < 0 {
time.Sleep((-timeoutMs) * time.Millisecond)
}
}
time.Sleep(1000 * time.Millisecond) time.Sleep(1000 * time.Millisecond)
} }
}() }()
@ -85,47 +90,86 @@ func NewIdCreater(typ string, helper IdCreatorHelper) IdCreater {
return idg return idg
} }
func (idg *idCreater) GetId(idparts ...interface{}) string { func (idg *idCreator) backgroudTaskForGenerateRange() {
var prefix = idg.Creator.CreatePrefix(idparts...) // rocover panic
for { defer func() {
if r := recover(); r != nil {
log.Warningf("backgroudTaskForGenerateRange panic: [%v]", r)
}
}()
// doing range generator
for i := 0; ; i++ {
logStr := ""
log.SetLogid(fmt.Sprintf("uidcreator%07d", i))
for prefix, queue := range idg.Ranges {
var qlen = queue.Len()
var timeoutMs = 1 * time.Millisecond
if qlen <= 0 {
timeoutMs = 1 * time.Millisecond
} else if qlen <= 2 {
timeoutMs = 50 * time.Millisecond
} else {
timeoutMs = -1000 * time.Millisecond
}
if int(timeoutMs) > 0 {
idg.createNewRangeWithErrorPanic(prefix)
time.Sleep(timeoutMs)
} else if int(timeoutMs) < 0 {
time.Sleep(-timeoutMs)
}
if i%50 == 0 {
logStr += fmt.Sprintf("queue-legth:(%s,%d);", prefix, queue.Len())
}
}
if i%50 == 0 {
log.Infof("backgroud idcreator info: %s", logStr)
}
time.Sleep(3000 * time.Millisecond)
}
}
func (idg *idCreator) GetId(idparts ...interface{}) string {
var retry int
var prefix = idg.Helper.CreatePrefix(idparts...)
for retry = 0; retry < 50; retry++ {
var id int64 = 0 var id int64 = 0
var cur = idg.getCurrentRange(prefix) var currange = idg.getCurrentRange(prefix)
log.Infof("success get current range:(%s, %d-%d)",
prefix, currange.Start, currange.End)
// Step 1. Optimistic[no lock] to add id in current range. // Step 1. Optimistic[no lock] to add id in current range.
for { for {
id = atomic.LoadInt64(&cur.Next) id = atomic.LoadInt64(&currange.Next)
if id > cur.End { if id > currange.End {
//next range //next range
currange.Status = RANGE_STATUS_USEOUT
break break
} }
if atomic.CompareAndSwapInt64(&cur.Next, id, id+1) { if atomic.CompareAndSwapInt64(&currange.Next, id, id+1) {
//success //success
break break
} }
} }
// Step 2. No lock add failed, try add id in next range. // Step 2. Presious [no lock] adding failed, try add id in next range.
if id > cur.End { if id > currange.End {
//next range //next range
idg.turnToNextRange(cur) log.Infof("failed get id from current range, try next")
idg.turnToNextRange(currange)
continue continue
} }
// Step 3. Success, return mixed id. // Step 3. Success, return mixed id.
return idg.MixId(cur, id) log.Infof("success get id(not miexed):(%s, %d)", prefix, id)
return idg.Helper.CreateMixedId(prefix, id, idparts...)
} }
log.Errorf("getid from range retry too much time(%d)", retry)
panic("getid from range retry too much time")
} }
// Range Range define the range [start, end] of ids. func (idg *idCreator) createDefaultPrefix() (prefix string, err error) {
type Range struct {
Id int64 // id
Status int // 0:invlid; 1:ok; 2:used
Prefix string // prefix
Start int64 // begin id of this range(include)
End int64 // end id of this range(include)
Length int64 // lenght of this range
}
func (idg *idCreater) createDefaultPrefix() (prefix string, err error) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
err = fmt.Errorf("default prefix not surport by user") err = fmt.Errorf("default prefix not surport by user")
@ -137,94 +181,139 @@ func (idg *idCreater) createDefaultPrefix() (prefix string, err error) {
return prefix, err return prefix, err
} }
func (idg *idCreater) createNewRange(prefix string) error { func (idg *idCreator) createNewRangeWithErrorPanic(prefix string) {
// Step 1. create range from db or something else. if err := idg.createNewRange(prefix); err != nil {
start, length, err := idg.Creator.CreateRange(idg.Type, prefix) panic("create new range error:" + err.Error())
if err != nil {
return fmt.Errorf(
"idCreater '%s' generate range err:'%s'",
idg.Type, err.Error())
} }
}
// Step 2. lock generator. func (idg *idCreator) removeOldRangeWithErrorPanic(prefix string) {
if err := idg.removeOldRange(prefix); err != nil {
panic("remove old range error:" + err.Error())
}
}
func (idg *idCreator) createNewRange(prefix string) error {
// Step 1. lock generator.
idg.lock.Lock() idg.lock.Lock()
defer idg.lock.Unlock() defer idg.lock.Unlock()
lock, ok := idg.RangesLocks[prefix] if _, ok := idg.RangesLocks[prefix]; !ok {
if !ok { idg.Ranges[prefix] = &rangeQueue{Deque: *deque.New[*Range]()}
lock = &sync.Mutex{} idg.RangesLocks[prefix] = &sync.Mutex{}
queue = deque.New[*Range]()
idg.Ranges[prefix] = queue
idg.RangesLocks[prefix] = lock
} }
var queue = idg.Ranges[prefix]
var lock = idg.RangesLocks[prefix]
lock.Lock() lock.Lock()
defer lock.Unlock() defer lock.Unlock()
var queueLen = queue.Len()
if queueLen >= 30 {
log.Infof("ignore createNewRange beacuse of queue is long enough(%d)", queueLen)
return nil
}
// Step 2. create range from db or something else.
start, length, err := idg.Helper.CreateRange(prefix)
if err != nil {
str := fmt.Sprintf(
"idCreator '%s' generate range err:'%s'",
idg.Type, err.Error())
log.Errorf("%s", str)
return errors.New(str)
}
// Step 3. push to ranges queue. // Step 3. push to ranges queue.
queue.PushBack( var r = &Range{
&Range{ Next: start,
Id: 0, Status: RANGE_STATUS_OK,
Status: 1, Prefix: prefix,
Prefix: prefix, Start: start,
Start: start, End: start + int64(length) - 1,
End: start + length - 1, Length: length,
Length: length, }
}) queue.PushBack(r)
log.Infof("success createNewRange (%s, %d-%d)", prefix, start, r.End)
return nil return nil
} }
func (idg *idCreater) getCurrentRange(prefix string) (r *Range) { func (idg *idCreator) removeOldRange(prefix string) error {
if lock, ok := idg.RangesLocks[prefix]; ok {
lock.Lock()
defer lock.Unlock()
queue := idg.Ranges[prefix]
for queue.Len() > 0 {
r := queue.Front()
if r.Status == RANGE_STATUS_OK {
log.Infof("success remove all useout ranges for prefix:%s", prefix)
return nil
} else if r.Status == RANGE_STATUS_USEOUT {
if rr := queue.PopFront(); rr != r {
panic("queue head is changed, maybe somethong bug")
}
log.Infof("success remove old range:(%s,%d-%d), try remove next",
prefix, r.Start, r.End)
} else {
panic("invalid status, maybe somethong bug")
}
}
} else {
log.Infof("rare case: rangequeue:(%s) have removed by others", prefix)
}
return nil
}
func (idg *idCreator) getCurrentRange(prefix string) (r *Range) {
var (
ok = false
lock *sync.Mutex = nil
queue *rangeQueue = nil
)
// Step 0. get ranges queue by prefix. // Step 0. get ranges queue by prefix.
var queue, ok = idg.Ranges[prefix] for trycount := 0; !ok && trycount < 3; trycount++ {
if !ok { if queue, ok = idg.Ranges[prefix]; ok {
lock = idg.RangesLocks[prefix]
break
}
// try create new range
idg.createNewRange(prefix) idg.createNewRange(prefix)
} }
var lock, ok = idg.RangesLocks[prefix]
if !ok { if !ok {
panic("unreachable code: lock is not exist") panic("maybe something bug or too many pressures," +
"cannot found range queue after createNewRange retry 3 times")
} }
lock.Lock()
defer lock.Unlock()
// Step 1. try read from queue directly. // Step 1. try read from queue directly.
lock.Lock()
if queue.Len() >= 1 { if queue.Len() >= 1 {
return queue.Front() r = queue.Front()
lock.Unlock()
return r
} }
lock.Unlock()
// Step 2. try create a new range & try again. // Step 2. retry create a new range & try again.
idg.createNewRange(prefix) idg.createNewRange(prefix)
lock.Lock()
defer lock.Unlock()
if queue.Len() >= 1 { if queue.Len() >= 1 {
return queue.Front() r = queue.Front()
return r
} }
panic("get current range failed, too many pressure???") panic("get current range failed, too many pressure???")
} }
func (idg *idCreater) turnToNextRange(r *Range) { func (idg *idCreator) turnToNextRange(r *Range) {
// We donot lock the range, because there is no too much // We donot lock the range, because there is no too much
// side effect even if 'createNewRange' execute twice. // side effect even if 'createNewRange' execute twice.
switch r.Status { switch r.Status {
case 1: case RANGE_STATUS_USEOUT:
r.Status = 2 idg.createNewRangeWithErrorPanic(r.Prefix)
idg.createNewRange(r.Prefix) idg.removeOldRangeWithErrorPanic(r.Prefix)
case 2: case RANGE_STATUS_OK:
//have turn by others, we do nothing. // have turn by others, we do nothing.
log.Debugf("have turn by others, we do nothing.")
default: default:
panic("invalid status, maybe somethong bug") panic("invalid status, maybe somethong bug")
} }
} }
// demo demo range creator, return current timestamp % 86400 * lenght.
// **DONOT** use in product enviroment.
// **DONOT** use in product enviroment.
// **DONOT** use in product enviroment.
type demo struct{}
func (d *demo) GetRange(typ, prefix string) (start, length int64, err error) {
var curdate string = time.Now().Format("20060102")
if prefix != curdate {
return 0, 0, fmt.Errorf("demo just support prefix is current date")
}
start = uint64((time.Now().Unix() % 86400) * 10000)
length = 10000
return start, length, nil
}