diff --git a/xstream/reader.go b/xstream/reader.go new file mode 100644 index 0000000..818fa1a --- /dev/null +++ b/xstream/reader.go @@ -0,0 +1,269 @@ +package xstream + +import ( + "errors" + "fmt" + "net" + "time" + + "github.com/garyburd/redigo/redis" + "qoobing.com/gomod/log" + "qoobing.com/gomod/redis/sentinel" + "qoobing.com/gomod/str" +) + +var ( + redisPools = map[string]*redis.Pool{} +) + +type Config struct { + stream string + group string + sentinelConfig sentinel.Config +} + +type xstreamMessage struct { + Id string + Value []byte +} + +type xstreamReader struct { + cfg Config + id string + name string + conn redis.Conn + connPool *redis.Pool + closed bool + restart bool +} + +func NewReader(cfg Config) (r *xstreamReader) { + // redis pool + poolId := cfg.sentinelConfig.Sentinels + if _, ok := redisPools[poolId]; !ok { + redisPools[poolId] = sentinel.NewPool(cfg.sentinelConfig) + } + + // initialize + connPool := redisPools[poolId] + r = &xstreamReader{ + cfg: cfg, + conn: connPool.Get(), + connPool: connPool, + closed: false, + restart: true, + } + r.id = str.GetRandomNumString(12) + r.name = newName(r.conn, r.id, cfg.group) + if err := r.initGroup(); err != nil { + panic(fmt.Sprintf("initGroup failed: error:%s", err)) + } + + // heart beat + go func() { + beatconn := redisPools[poolId].Get() + defer beatconn.Close() + for !r.closed { + id, err := redis.String(beatconn.Do("GETEX", r.name, "EX", 120)) + if err != nil { + log.Warningf("redis GETEX failed, err:%s", err) + r.Close() + } else if id != r.id { + log.Warningf("name id in redis (%s) not equal to local name id(%s)", id, r.id) + r.Close() + } + time.Sleep(15 * time.Second) + } + }() + + return r +} + +func newName(conn redis.Conn, id, group string) string { + var ( + mkeys = []any{} + consumer = fmt.Sprintf("%s-comsumer", group) + consumers = map[string]string{} + MAX_CONSUMER_NUM = 30 + ) + + // get all name + for i := 0; i < MAX_CONSUMER_NUM; i++ { + mkeys = append(mkeys, fmt.Sprintf("%s-%03d", consumer, i)) + } + ret, err := redis.Strings(conn.Do("MGET", mkeys...)) + if err != nil { + log.Warningf("redis MGET error: %s", err) + panic("redis MGET error") + } + for i, s := range ret { + if s != "" { + name := fmt.Sprintf("%s-%03d", consumer, i) + consumers[name] = s + log.Infof("name:[%s]", name) + } + } + + // get unused name + for i := 0; i < MAX_CONSUMER_NUM; i++ { + name := fmt.Sprintf("%s-%03d", consumer, i) + if _, ok := consumers[name]; !ok { + consumer = name + break + } + if i == MAX_CONSUMER_NUM-1 { + panic("too many consumer or something bug") + + } + } + + // set to redis + success, err := redis.Int(conn.Do("SETNX", consumer, id)) + conn.Do("GETEX", consumer, "EX", 120) + if err != nil { + log.Warningf("redis SETNX(%s,%s) error: %s", consumer, id, err) + panic("redis SETNX error") + } + if success == 0 { + log.Warningf("consumer name(%s) used by others", consumer) + panic("consumer name used by others") + } + conn.Do("GETEX", consumer, "EX", 120) + return consumer +} + +func (r *xstreamReader) Close() { + // delete client consumer name + id, err := redis.String(r.redisDo("GETEX", r.name, "EX", 120)) + if err == nil && id == r.id { + r.redisDo("DEL", r.name) + } + + // clonse redis connection(put to pool) + r.closed = true + r.conn.Close() +} + +func (r *xstreamReader) fetchMessage() (ret []any, err error) { + // func (r *xstreamReader) fetchMessage(ctx context.Context) (ret []any, err error) { + args := []interface{}{ + "GROUP", r.cfg.group, r.name, + "COUNT", 1, + "BLOCK", 864000000, + "STREAMS", r.cfg.stream, + } + if r.restart { + args = append(args, "0-0") + } else { + args = append(args, ">") + } + + ret, err = redis.Values(r.redisDo("XREADGROUP", args...)) + + if err != nil { + return ret, err + } else if ret == nil { + return ret, nil + } + if len(ret) != 1 { + panic("len(ret) is not 1") + } + return ret, nil +} + +func (r *xstreamReader) FetchMessage() (msg xstreamMessage, err error) { + // func (r *xstreamReader) FetchMessage(ctx context.Context) (msg xstreamMessage, err error) { + // Step 1. get result from redis xstream + var ret, qmsg, msgs []any = nil, nil, nil + for { + ret, err = r.fetchMessage() + ////////////////////////////////////////////////////////////////////// + ////ret format example: + //// [["coreserver.txneedrepair", [["1675180029481-0", ["a", "b"]]]]] + ////////////////////////////////////////////////////////////////////// + if err != nil { + log.Warningf("fetchMessage meassage err:%s", err) + return msg, errors.New("fetchMessage meassage err") + } + if len(ret) == 0 { + log.Warningf("redis stream result length is 0, which shoud bigger than 0") + return msg, errors.New("redis fetchMessage ret length is 0") + } + + qmsg, _ = redis.Values(ret[0], nil) // ["coreserver.txneedrepair", [["1675180029481-0", ["a", "b"]]]] + msgs, _ = redis.Values(qmsg[1], nil) // [["1675180029481-0", ["a", "b"]]] + + if len(msgs) == 0 && r.restart { + r.restart = false + continue + } + if len(msgs) != 1 { + log.Warningf("meassage length from xstream is not 1(but %d)", len(msgs)) + return msg, errors.New("no new msg") + } + break + } + + // Step 2. parse xstream item result + var ( + amsg, _ = redis.Values(msgs[0], nil) // ["1675180029481-0", ["a", "b"]] + rmsg, _ = redis.Values(amsg[1], nil) // ["a", "b"] + msgid, _ = redis.String(amsg[0], nil) // "1675180029481-0" + msgval, _ = redis.String(rmsg[1], nil) // "b" + ) + msg.Id = msgid + msg.Value = []byte(msgval) + log.Infof("success fetched meassage '%s': %s", msg.Id, msgval) + return msg, nil +} + +func (r *xstreamReader) CommitMessages(msg xstreamMessage) (err error) { + args := []interface{}{ + r.cfg.stream, // key + r.cfg.group, // group + msg.Id, // id + } + + if cnt, err := redis.Int(r.redisDo("XACK", args...)); err != nil { + return err + } else if cnt != 1 { + log.Warningf("attention: 'XACK' return %d (not 1), but we trade as success", cnt) + } + return nil +} + +func (r *xstreamReader) redisDo(cmd string, args ...any) (reply any, err error) { + if r.closed { + return nil, net.ErrClosed + } else if r.conn.Err() != nil { + log.Warningf("redis connection closed/unavaliable, we will get new one from pool") + oldconn := r.conn + newconn := r.connPool.Get() + r.conn = newconn + oldconn.Close() + } + + return r.conn.Do(cmd, args...) +} + +func (r *xstreamReader) initGroup() error { + groups, err := r.redisDo("XINfO", "GROUPS", r.cfg.stream) + groupInfosInf, _ := groups.([]interface{}) + if err != nil || len(groupInfosInf) == 0 { + log.Infof("create group, name: %s", r.cfg.group) + _, err = r.redisDo("XGROUP", "CREATE", r.cfg.stream, r.cfg.group, "$", "MKSTREAM") + return err + } + + for _, groupInfo := range groupInfosInf { + groupInfoInf := groupInfo.([]interface{}) + groupName := string(groupInfoInf[1].([]uint8)) + log.Infof("group name: %s", groupName) + if groupName == r.cfg.group { + return nil + } + } + log.Infof("create group, name: %s", r.cfg.group) + _, err = r.redisDo("XGROUP", "CREATE", r.cfg.stream, r.cfg.group, "$", "MKSTREAM") + return err +} diff --git a/xstream/writer.go b/xstream/writer.go new file mode 100644 index 0000000..9905a82 --- /dev/null +++ b/xstream/writer.go @@ -0,0 +1,20 @@ +package xstream + +//// func Example() { +//// var ( +//// m = model.NewModelDefault() +//// queue = "coreserver.newaccount" +//// msgval = map[string]interface{}{ +//// "sign": "", +//// "appid": c.AppId, +//// "userid": c.UserId, +//// "account": input.Address, +//// } +//// msgstr, _ = json.Marshal(msgval) +//// ) +//// defer m.Close() +//// +//// if _, err = m.Redis.Do("XADD", queue, "*", "msg", msgstr); err != nil { +//// log.Warningf("!!!!FAILED add newaccount to redis queue, msg is:'%s'", msgstr) +//// } +//// }