fix: new name conflict with other stream

This commit is contained in:
bryanqiu 2024-05-28 15:11:20 +08:00
parent 4089e4d6b7
commit 66a8d078a0
3 changed files with 86 additions and 24 deletions

11
go.mod
View File

@ -1,3 +1,14 @@
module qoobing.com/gomod/redis module qoobing.com/gomod/redis
go 1.18 go 1.18
require (
github.com/gomodule/redigo v1.9.2
qoobing.com/gomod/log v1.2.8
qoobing.com/gomod/str v1.0.5
)
require (
github.com/tylerb/gls v0.0.0-20150407001822-e606233f194d // indirect
github.com/tylerb/is v2.1.4+incompatible // indirect
)

38
redis_test.go Normal file
View File

@ -0,0 +1,38 @@
package redis
import (
"fmt"
"log"
"testing"
"qoobing.com/gomod/redis/sentinel"
"qoobing.com/gomod/redis/xstream"
)
func TestNewConsumer(t *testing.T) {
cfg := xstream.Config{
Group: "group_a",
Stream: "testxstream_queue",
Sentinel: sentinel.Config{
Debug: true, //调试开关会在日志打印REDIS语句)
MasterName: "walletmaster", //REDIS主名称一个哨兵集群可管理多个REDIS主从结构
Username: "web3wallet", //REDIS连接用户名
Password: "web3onthemoon@2023#dev", //REDIS连接用户密码
Sentinels: "sentinel-0.redis:5000", //哨兵节点列表,逗号分隔,一般配置三个
SentinelUsername: "sentinel", //哨兵节点连接用户名不填默认default
SentinelPassword: "password@mhxzkhl#dev", //哨兵节点连接密码,不填认为集群无密码
},
}
for i := 0; i < 15; i++ {
i := i
name := fmt.Sprintf("name-%d", i)
t.Run(name, func(t *testing.T) {
t.Parallel()
log.Println("name", name)
if r := xstream.NewReader(cfg); r == nil {
t.Error("create new reader failed")
}
})
}
}

View File

@ -144,7 +144,7 @@ func NewReader(cfg Config) (r *xstreamReader) {
restart: true, restart: true,
} }
r.id = str.GetRandomNumString(12) r.id = str.GetRandomNumString(12)
r.name = newName(r.conn, r.id, cfg.Group) r.name = newName(r.conn, r.id, cfg.Group, cfg.Stream)
if err := r.initGroup(); err != nil { if err := r.initGroup(); err != nil {
panic(fmt.Sprintf("initGroup failed: error:%s", err)) panic(fmt.Sprintf("initGroup failed: error:%s", err))
} }
@ -169,17 +169,18 @@ func NewReader(cfg Config) (r *xstreamReader) {
return r return r
} }
func newName(conn redis.Conn, id, group string) string { func newName(conn redis.Conn, id, group, xstream string) string {
var ( var (
mkeys = []any{} mkeys = []any{}
consumer = fmt.Sprintf("%s-comsumer", group) prefix = fmt.Sprintf("xstream[%s]-group[%s]-", xstream, group)
consumer = func(i int) string { return fmt.Sprintf("%s%03d", prefix, i) }
consumers = map[string]string{} consumers = map[string]string{}
MAX_CONSUMER_NUM = 30 MAX_CONSUMER_NUM = 30
) )
// get all name // get all name
for i := 0; i < MAX_CONSUMER_NUM; i++ { for i := 0; i < MAX_CONSUMER_NUM; i++ {
mkeys = append(mkeys, fmt.Sprintf("%s-%03d", consumer, i)) mkeys = append(mkeys, consumer(i))
} }
ret, err := redis.Strings(conn.Do("MGET", mkeys...)) ret, err := redis.Strings(conn.Do("MGET", mkeys...))
if err != nil { if err != nil {
@ -188,38 +189,50 @@ func newName(conn redis.Conn, id, group string) string {
} }
for i, s := range ret { for i, s := range ret {
if s != "" { if s != "" {
name := fmt.Sprintf("%s-%03d", consumer, i) name := consumer(i)
consumers[name] = s consumers[name] = s
log.Infof("name:[%s]", name)
} }
} }
var (
i = 0
retry = 0
consumer_i = ""
)
LOOP:
// get unused name // get unused name
for i := 0; i < MAX_CONSUMER_NUM; i++ { for ; i < MAX_CONSUMER_NUM; i++ {
name := fmt.Sprintf("%s-%03d", consumer, i) if _, ok := consumers[consumer(i)]; !ok {
if _, ok := consumers[name]; !ok { consumer_i = consumer(i)
consumer = name
break break
} }
if i == MAX_CONSUMER_NUM-1 {
panic("too many consumer or something bug")
} }
if i == MAX_CONSUMER_NUM {
panic(fmt.Sprintf("too many consumer(more than %d)???", MAX_CONSUMER_NUM))
} }
// set to redis // set to redis (try lock)
success, err := redis.Int(conn.Do("SETNX", consumer, id)) success, err := redis.Int(conn.Do("SETNX", consumer_i, id))
conn.Do("GETEX", consumer, "EX", 120) conn.Do("GETEX", consumer_i, "EX", 120) /* for SETNX failed forever */
if err != nil { if err != nil && retry >= 2 {
log.Warningf("redis SETNX(%s,%s) error: %s", consumer, id, err) log.Warningf("redis SETNX(%s,%s) error: %s", consumer_i, id, err)
panic("redis SETNX error") panic(fmt.Sprintf("redis SETNX error after try [%d] times", retry))
}
// return or retry if something error
if err != nil /*&& retry < 2*/ {
retry++
log.Warningf("redis SETNX(%s,%s) error: %s, retry(%d)", consumer_i, id, err, retry)
goto LOOP
} }
if success == 0 { if success == 0 {
log.Warningf("consumer name(%s) used by others", consumer) i++
panic("consumer name used by others") retry++
log.Warningf("consumer name(%s) used by others, retry(%d)", consumer_i, retry)
goto LOOP
} }
conn.Do("GETEX", consumer, "EX", 120) log.Infof("success to get new consumer-name:[%s]", consumer_i)
return consumer return consumer_i
} }
func (r *xstreamReader) Close() { func (r *xstreamReader) Close() {