package xstream import ( "context" "errors" "fmt" "net" "time" "github.com/gomodule/redigo/redis" "qoobing.com/gomod/log" "qoobing.com/gomod/redis/sentinel" "qoobing.com/gomod/str" ) ////////////////////////////////////////////////////////////////////////////////// ///////////////////////// example ////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////// ////// const ( ////// CORESERVER_NEWACCOUNT_GROUP = "groupid-coreserver-newaccount" ////// CORESERVER_NEWACCOUNT_QUEUE = "coreserver.newaccount" ////// ) ////// ////// type NewAccountMessage struct { ////// Sign string `json:"sign"` ////// Appid string `json:"appid"` ////// Userid int64 `json:"userid"` ////// Account string `json:"account"` ////// CreateTime time.Time `json:"createtime"` ////// } ////// ////// func Run(ctx context.Context) { ////// var SLEEP_TIME = 10 * time.Second ////// go func() { ////// for { ////// log.Infof("start run newaccount marketing engine ...") ////// runNewAccountMarketingEngine(ctx) ////// log.Infof("newaccount marketing engine exit unexpected, retry %s later again", SLEEP_TIME.String()) ////// time.Sleep(SLEEP_TIME) ////// } ////// }() ////// } ////// ////// func runNewAccountMarketingEngine(ctx context.Context) { ////// // r read message from redis ////// cfg := config.Instance() ////// r := xstream.NewReader(xstream.Config{ ////// Group: CORESERVER_NEWACCOUNT_GROUP, ////// Stream: CORESERVER_NEWACCOUNT_QUEUE, ////// Sentinel: cfg.CoreserverRedis, ////// }) ////// defer r.Close() ////// ////// // w write message to kafka ////// w := &kafka.Writer{ ////// Addr: kafka.TCP(cfg.MarketingKafka.Addresses...), ////// Balancer: &kafka.LeastBytes{}, ////// } ////// defer w.Close() ////// ////// var error_wait_time = 0 * time.Millisecond ////// for { ////// // Step 0. wait a moment if need ////// if error_wait_time > 0 { ////// time.Sleep(error_wait_time) ////// } ////// ////// // Step 1. fetch message ////// var err error = nil ////// var msg xstream.Message ////// var newacctMsg NewAccountMessage ////// log.Infof("waiting fetch newaccount message from redis queue...") ////// if msg, err = r.FetchMessage(ctx); err != nil { ////// log.Errorf("failed fetch message from redis queue, err:%s", err) ////// //error_wait_time = 1000 * time.Millisecond ////// break ////// } ////// ////// // Step 2. decode message ////// if err := json.Unmarshal(msg.Value, &newacctMsg); err != nil { ////// log.Errorf("failed json.unmarshal message[%s], err:%s", string(msg.Value), err) ////// continue ////// } else if newacctMsg.Userid == 0 || newacctMsg.Account == "" { ////// log.Errorf("invalid newaccount message, userid=[%d]", newacctMsg.Userid) ////// continue ////// } ////// log.Infof("fetch newaccount message success, account=[%s]", newacctMsg.Account) ////// ////// // Step 3. handle message ////// // xxxxxxx handle message xxxxxxxxxxxxxxxxxxxxxxxx ////// // xxxxxxx handle message xxxxxxxxxxxxxxxxxxxxxxxx ////// // xxxxxxx handle message xxxxxxxxxxxxxxxxxxxxxxxx ////// // xxxxxxx handle message xxxxxxxxxxxxxxxxxxxxxxxx ////// // xxxxxxx handle message xxxxxxxxxxxxxxxxxxxxxxxx ////// // Step 4. send ack to coreserver redis ////// if err := r.CommitMessages(ctx, msg); err != nil { ////// log.Errorf("failed commit message to redis, err:%s", err) ////// error_wait_time = 1000 * time.Millisecond ////// continue ////// } ////// } ////// } ///////////////////////////// end example //////////////////////////////////////////// var ( redisPools = map[string]*redis.Pool{} ) type Config struct { Group string `toml:"group"` Stream string `toml:"stream"` Sentinel sentinel.Config `toml:"sentinel"` } type Message struct { Id string Value []byte } type xstreamReader struct { cfg Config id string name string conn redis.Conn connPool *redis.Pool closed bool restart bool } func NewReader(cfg Config) (r *xstreamReader) { // redis pool poolId := cfg.Sentinel.Sentinels if _, ok := redisPools[poolId]; !ok { redisPools[poolId] = sentinel.NewPool(cfg.Sentinel) } // initialize connPool := redisPools[poolId] r = &xstreamReader{ cfg: cfg, conn: connPool.Get(), connPool: connPool, closed: false, restart: true, } r.id = str.GetRandomNumString(12) r.name = newName(r.conn, r.id, cfg.Group, cfg.Stream) if err := r.initGroup(); err != nil { panic(fmt.Sprintf("initGroup failed: error:%s", err)) } // heart beat go func() { beatconn := redisPools[poolId].Get() defer beatconn.Close() for !r.closed { id, err := redis.String(beatconn.Do("GETEX", r.name, "EX", 120)) if err != nil { log.Warningf("redis GETEX failed, err:%s", err) r.Close() } else if id != r.id { log.Warningf("name id in redis (%s) not equal to local name id(%s)", id, r.id) r.Close() } time.Sleep(15 * time.Second) } }() return r } func newName(conn redis.Conn, id, group, xstream string) string { var ( mkeys = []any{} prefix = fmt.Sprintf("xstream[%s]-group[%s]-", xstream, group) consumer = func(i int) string { return fmt.Sprintf("%s%03d", prefix, i) } consumers = map[string]string{} MAX_CONSUMER_NUM = 30 ) // get all name for i := 0; i < MAX_CONSUMER_NUM; i++ { mkeys = append(mkeys, consumer(i)) } ret, err := redis.Strings(conn.Do("MGET", mkeys...)) if err != nil { log.Warningf("redis MGET error: %s", err) panic("redis MGET error") } for i, s := range ret { if s != "" { name := consumer(i) consumers[name] = s } } var ( i = 0 retry = 0 consumer_i = "" ) LOOP: // get unused name for ; i < MAX_CONSUMER_NUM; i++ { if _, ok := consumers[consumer(i)]; !ok { consumer_i = consumer(i) break } } if i == MAX_CONSUMER_NUM { panic(fmt.Sprintf("too many consumer(more than %d)???", MAX_CONSUMER_NUM)) } // set to redis (try lock) success, err := redis.Int(conn.Do("SETNX", consumer_i, id)) conn.Do("GETEX", consumer_i, "EX", 120) /* for SETNX failed forever */ if err != nil && retry >= 2 { log.Warningf("redis SETNX(%s,%s) error: %s", consumer_i, id, err) panic(fmt.Sprintf("redis SETNX error after try [%d] times", retry)) } // return or retry if something error if err != nil /*&& retry < 2*/ { retry++ log.Warningf("redis SETNX(%s,%s) error: %s, retry(%d)", consumer_i, id, err, retry) goto LOOP } if success == 0 { i++ retry++ log.Warningf("consumer name(%s) used by others, retry(%d)", consumer_i, retry) goto LOOP } log.Infof("success to get new consumer-name:[%s]", consumer_i) return consumer_i } func (r *xstreamReader) Close() { // delete client consumer name id, err := redis.String(r.redisDo("GETEX", r.name, "EX", 120)) if err == nil && id == r.id { r.redisDo("DEL", r.name) } // clonse redis connection(put to pool) r.closed = true r.conn.Close() } func (r *xstreamReader) fetchMessage() (ret []any, err error) { // func (r *xstreamReader) fetchMessage(ctx context.Context) (ret []any, err error) { args := []interface{}{ "GROUP", r.cfg.Group, r.name, "COUNT", 1, "BLOCK", 864000000, "STREAMS", r.cfg.Stream, } if r.restart { args = append(args, "0-0") } else { args = append(args, ">") } ret, err = redis.Values(r.redisDo("XREADGROUP", args...)) if err != nil { return ret, err } else if ret == nil { return ret, nil } if len(ret) != 1 { panic("len(ret) is not 1") } return ret, nil } func (r *xstreamReader) FetchMessage(ctx context.Context) (msg Message, err error) { // func (r *xstreamReader) FetchMessage(ctx context.Context) (msg xstreamMessage, err error) { // Step 1. get result from redis xstream var ret, qmsg, msgs []any = nil, nil, nil for { ret, err = r.fetchMessage() ////////////////////////////////////////////////////////////////////// ////ret format example: //// [["coreserver.txneedrepair", [["1675180029481-0", ["a", "b"]]]]] ////////////////////////////////////////////////////////////////////// if err != nil { log.Warningf("fetchMessage meassage err:%s", err) return msg, errors.New("fetchMessage meassage err") } if len(ret) == 0 { log.Warningf("redis stream result length is 0, which shoud bigger than 0") return msg, errors.New("redis fetchMessage ret length is 0") } qmsg, _ = redis.Values(ret[0], nil) // ["coreserver.txneedrepair", [["1675180029481-0", ["a", "b"]]]] msgs, _ = redis.Values(qmsg[1], nil) // [["1675180029481-0", ["a", "b"]]] if len(msgs) == 0 && r.restart { r.restart = false continue } if len(msgs) != 1 { log.Warningf("meassage length from xstream is not 1(but %d)", len(msgs)) return msg, errors.New("no new msg") } break } // Step 2. parse xstream item result var ( amsg, _ = redis.Values(msgs[0], nil) // ["1675180029481-0", ["a", "b"]] rmsg, _ = redis.Values(amsg[1], nil) // ["a", "b"] msgid, _ = redis.String(amsg[0], nil) // "1675180029481-0" msgval, _ = redis.String(rmsg[1], nil) // "b" ) msg.Id = msgid msg.Value = []byte(msgval) log.Infof("success fetched meassage '%s': %s", msg.Id, msgval) return msg, nil } func (r *xstreamReader) CommitMessages(ctx context.Context, msg Message) (err error) { args := []interface{}{ r.cfg.Stream, // key r.cfg.Group, // group msg.Id, // id } if cnt, err := redis.Int(r.redisDo("XACK", args...)); err != nil { return err } else if cnt != 1 { log.Warningf("attention: 'XACK' return %d (not 1), but we trade as success", cnt) } return nil } func (r *xstreamReader) redisDo(cmd string, args ...any) (reply any, err error) { if r.closed { return nil, net.ErrClosed } else if r.conn.Err() != nil { log.Warningf("redis connection closed/unavaliable, we will get new one from pool") oldconn := r.conn newconn := r.connPool.Get() r.conn = newconn oldconn.Close() } return r.conn.Do(cmd, args...) } func (r *xstreamReader) initGroup() error { groups, err := r.redisDo("XINfO", "GROUPS", r.cfg.Stream) groupInfosInf, _ := groups.([]interface{}) if err != nil || len(groupInfosInf) == 0 { log.Infof("create group, name: %s", r.cfg.Group) _, err = r.redisDo("XGROUP", "CREATE", r.cfg.Stream, r.cfg.Group, "$", "MKSTREAM") return err } for _, groupInfo := range groupInfosInf { groupInfoInf := groupInfo.([]interface{}) groupName := string(groupInfoInf[1].([]uint8)) log.Infof("group name: %s", groupName) if groupName == r.cfg.Group { return nil } } log.Infof("create group, name: %s", r.cfg.Group) _, err = r.redisDo("XGROUP", "CREATE", r.cfg.Stream, r.cfg.Group, "$", "MKSTREAM") return err }