From 66a8d078a07a69c6459d98b172dca098762a3925 Mon Sep 17 00:00:00 2001 From: bryanqiu Date: Tue, 28 May 2024 15:11:20 +0800 Subject: [PATCH] fix: new name conflict with other stream --- go.mod | 11 +++++++++ redis_test.go | 38 +++++++++++++++++++++++++++++ xstream/reader.go | 61 ++++++++++++++++++++++++++++------------------- 3 files changed, 86 insertions(+), 24 deletions(-) create mode 100644 redis_test.go diff --git a/go.mod b/go.mod index fccf60c..630d130 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,14 @@ module qoobing.com/gomod/redis go 1.18 + +require ( + github.com/gomodule/redigo v1.9.2 + qoobing.com/gomod/log v1.2.8 + qoobing.com/gomod/str v1.0.5 +) + +require ( + github.com/tylerb/gls v0.0.0-20150407001822-e606233f194d // indirect + github.com/tylerb/is v2.1.4+incompatible // indirect +) diff --git a/redis_test.go b/redis_test.go new file mode 100644 index 0000000..5e2c964 --- /dev/null +++ b/redis_test.go @@ -0,0 +1,38 @@ +package redis + +import ( + "fmt" + "log" + "testing" + + "qoobing.com/gomod/redis/sentinel" + "qoobing.com/gomod/redis/xstream" +) + +func TestNewConsumer(t *testing.T) { + cfg := xstream.Config{ + Group: "group_a", + Stream: "testxstream_queue", + Sentinel: sentinel.Config{ + Debug: true, //调试开关(会在日志打印REDIS语句) + MasterName: "walletmaster", //REDIS主名称(一个哨兵集群可管理多个REDIS主从结构) + Username: "web3wallet", //REDIS连接用户名 + Password: "web3onthemoon@2023#dev", //REDIS连接用户密码 + Sentinels: "sentinel-0.redis:5000", //哨兵节点列表,逗号分隔,一般配置三个 + SentinelUsername: "sentinel", //哨兵节点连接用户名,不填默认default + SentinelPassword: "password@mhxzkhl#dev", //哨兵节点连接密码,不填认为集群无密码 + }, + } + + for i := 0; i < 15; i++ { + i := i + name := fmt.Sprintf("name-%d", i) + t.Run(name, func(t *testing.T) { + t.Parallel() + log.Println("name", name) + if r := xstream.NewReader(cfg); r == nil { + t.Error("create new reader failed") + } + }) + } +} diff --git a/xstream/reader.go b/xstream/reader.go index 6e65e0c..daeed17 100644 --- a/xstream/reader.go +++ b/xstream/reader.go @@ -144,7 +144,7 @@ func NewReader(cfg Config) (r *xstreamReader) { restart: true, } r.id = str.GetRandomNumString(12) - r.name = newName(r.conn, r.id, cfg.Group) + r.name = newName(r.conn, r.id, cfg.Group, cfg.Stream) if err := r.initGroup(); err != nil { panic(fmt.Sprintf("initGroup failed: error:%s", err)) } @@ -169,17 +169,18 @@ func NewReader(cfg Config) (r *xstreamReader) { return r } -func newName(conn redis.Conn, id, group string) string { +func newName(conn redis.Conn, id, group, xstream string) string { var ( mkeys = []any{} - consumer = fmt.Sprintf("%s-comsumer", group) + prefix = fmt.Sprintf("xstream[%s]-group[%s]-", xstream, group) + consumer = func(i int) string { return fmt.Sprintf("%s%03d", prefix, i) } consumers = map[string]string{} MAX_CONSUMER_NUM = 30 ) // get all name for i := 0; i < MAX_CONSUMER_NUM; i++ { - mkeys = append(mkeys, fmt.Sprintf("%s-%03d", consumer, i)) + mkeys = append(mkeys, consumer(i)) } ret, err := redis.Strings(conn.Do("MGET", mkeys...)) if err != nil { @@ -188,38 +189,50 @@ func newName(conn redis.Conn, id, group string) string { } for i, s := range ret { if s != "" { - name := fmt.Sprintf("%s-%03d", consumer, i) + name := consumer(i) consumers[name] = s - log.Infof("name:[%s]", name) } } + var ( + i = 0 + retry = 0 + consumer_i = "" + ) +LOOP: // get unused name - for i := 0; i < MAX_CONSUMER_NUM; i++ { - name := fmt.Sprintf("%s-%03d", consumer, i) - if _, ok := consumers[name]; !ok { - consumer = name + for ; i < MAX_CONSUMER_NUM; i++ { + if _, ok := consumers[consumer(i)]; !ok { + consumer_i = consumer(i) break } - if i == MAX_CONSUMER_NUM-1 { - panic("too many consumer or something bug") - - } + } + if i == MAX_CONSUMER_NUM { + panic(fmt.Sprintf("too many consumer(more than %d)???", MAX_CONSUMER_NUM)) } - // set to redis - success, err := redis.Int(conn.Do("SETNX", consumer, id)) - conn.Do("GETEX", consumer, "EX", 120) - if err != nil { - log.Warningf("redis SETNX(%s,%s) error: %s", consumer, id, err) - panic("redis SETNX error") + // set to redis (try lock) + success, err := redis.Int(conn.Do("SETNX", consumer_i, id)) + conn.Do("GETEX", consumer_i, "EX", 120) /* for SETNX failed forever */ + if err != nil && retry >= 2 { + log.Warningf("redis SETNX(%s,%s) error: %s", consumer_i, id, err) + panic(fmt.Sprintf("redis SETNX error after try [%d] times", retry)) + } + + // return or retry if something error + if err != nil /*&& retry < 2*/ { + retry++ + log.Warningf("redis SETNX(%s,%s) error: %s, retry(%d)", consumer_i, id, err, retry) + goto LOOP } if success == 0 { - log.Warningf("consumer name(%s) used by others", consumer) - panic("consumer name used by others") + i++ + retry++ + log.Warningf("consumer name(%s) used by others, retry(%d)", consumer_i, retry) + goto LOOP } - conn.Do("GETEX", consumer, "EX", 120) - return consumer + log.Infof("success to get new consumer-name:[%s]", consumer_i) + return consumer_i } func (r *xstreamReader) Close() {