From ee6659ccb1b74e9e6dc7888dcd6d7abbfd65aca6 Mon Sep 17 00:00:00 2001 From: bryanqiu Date: Fri, 24 Nov 2023 11:33:58 +0800 Subject: [PATCH] debug --- go.mod | 2 +- xstream/reader.go | 84 ++++++++++++++++++++++++++++------------------- 2 files changed, 52 insertions(+), 34 deletions(-) diff --git a/go.mod b/go.mod index f84bc95..fccf60c 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ module qoobing.com/gomod/redis -go 1.16 +go 1.18 diff --git a/xstream/reader.go b/xstream/reader.go index 3d95129..6e65e0c 100644 --- a/xstream/reader.go +++ b/xstream/reader.go @@ -1,12 +1,13 @@ package xstream import ( + "context" "errors" "fmt" "net" "time" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" "qoobing.com/gomod/log" "qoobing.com/gomod/redis/sentinel" "qoobing.com/gomod/str" @@ -28,16 +29,34 @@ import ( ////// CreateTime time.Time `json:"createtime"` ////// } ////// +////// func Run(ctx context.Context) { +////// var SLEEP_TIME = 10 * time.Second +////// go func() { +////// for { +////// log.Infof("start run newaccount marketing engine ...") +////// runNewAccountMarketingEngine(ctx) +////// log.Infof("newaccount marketing engine exit unexpected, retry %s later again", SLEEP_TIME.String()) +////// time.Sleep(SLEEP_TIME) +////// } +////// }() +////// } +////// ////// func runNewAccountMarketingEngine(ctx context.Context) { ////// // r read message from redis ////// cfg := config.Instance() -////// r := NewReader(xstreamConf{ -////// group: CORESERVER_NEWACCOUNT_GROUP, -////// queue: CORESERVER_NEWACCOUNT_QUEUE, -////// sentinelConfig: cfg.CoreserverRedis, +////// r := xstream.NewReader(xstream.Config{ +////// Group: CORESERVER_NEWACCOUNT_GROUP, +////// Stream: CORESERVER_NEWACCOUNT_QUEUE, +////// Sentinel: cfg.CoreserverRedis, ////// }) ////// defer r.Close() ////// +////// // w write message to kafka +////// w := &kafka.Writer{ +////// Addr: kafka.TCP(cfg.MarketingKafka.Addresses...), +////// Balancer: &kafka.LeastBytes{}, +////// } +////// defer w.Close() ////// ////// var error_wait_time = 0 * time.Millisecond ////// for { @@ -48,7 +67,7 @@ import ( ////// ////// // Step 1. fetch message ////// var err error = nil -////// var msg xstreamMessage +////// var msg xstream.Message ////// var newacctMsg NewAccountMessage ////// log.Infof("waiting fetch newaccount message from redis queue...") ////// if msg, err = r.FetchMessage(ctx); err != nil { @@ -59,13 +78,13 @@ import ( ////// ////// // Step 2. decode message ////// if err := json.Unmarshal(msg.Value, &newacctMsg); err != nil { -////// log.Errorf("failed json.unmarshal message[%s], err:%s", string(msg.Value), err) -////// continue -////// } else if newacctMsg.Userid == 0 || newacctMsg.Account == "" { -////// log.Errorf("invalid newaccount message, userid=[%d]", newacctMsg.Userid) -////// continue -////// } -////// log.Infof("fetch newaccount message success, account=[%s]", newacctMsg.Account) +////// log.Errorf("failed json.unmarshal message[%s], err:%s", string(msg.Value), err) +////// continue +////// } else if newacctMsg.Userid == 0 || newacctMsg.Account == "" { +////// log.Errorf("invalid newaccount message, userid=[%d]", newacctMsg.Userid) +////// continue +////// } +////// log.Infof("fetch newaccount message success, account=[%s]", newacctMsg.Account) ////// ////// // Step 3. handle message ////// // xxxxxxx handle message xxxxxxxxxxxxxxxxxxxxxxxx @@ -73,7 +92,6 @@ import ( ////// // xxxxxxx handle message xxxxxxxxxxxxxxxxxxxxxxxx ////// // xxxxxxx handle message xxxxxxxxxxxxxxxxxxxxxxxx ////// // xxxxxxx handle message xxxxxxxxxxxxxxxxxxxxxxxx -////// ////// // Step 4. send ack to coreserver redis ////// if err := r.CommitMessages(ctx, msg); err != nil { ////// log.Errorf("failed commit message to redis, err:%s", err) @@ -89,12 +107,12 @@ var ( ) type Config struct { - stream string - group string - sentinelConfig sentinel.Config + Group string `toml:"group"` + Stream string `toml:"stream"` + Sentinel sentinel.Config `toml:"sentinel"` } -type xstreamMessage struct { +type Message struct { Id string Value []byte } @@ -111,9 +129,9 @@ type xstreamReader struct { func NewReader(cfg Config) (r *xstreamReader) { // redis pool - poolId := cfg.sentinelConfig.Sentinels + poolId := cfg.Sentinel.Sentinels if _, ok := redisPools[poolId]; !ok { - redisPools[poolId] = sentinel.NewPool(cfg.sentinelConfig) + redisPools[poolId] = sentinel.NewPool(cfg.Sentinel) } // initialize @@ -126,7 +144,7 @@ func NewReader(cfg Config) (r *xstreamReader) { restart: true, } r.id = str.GetRandomNumString(12) - r.name = newName(r.conn, r.id, cfg.group) + r.name = newName(r.conn, r.id, cfg.Group) if err := r.initGroup(); err != nil { panic(fmt.Sprintf("initGroup failed: error:%s", err)) } @@ -219,10 +237,10 @@ func (r *xstreamReader) Close() { func (r *xstreamReader) fetchMessage() (ret []any, err error) { // func (r *xstreamReader) fetchMessage(ctx context.Context) (ret []any, err error) { args := []interface{}{ - "GROUP", r.cfg.group, r.name, + "GROUP", r.cfg.Group, r.name, "COUNT", 1, "BLOCK", 864000000, - "STREAMS", r.cfg.stream, + "STREAMS", r.cfg.Stream, } if r.restart { args = append(args, "0-0") @@ -243,7 +261,7 @@ func (r *xstreamReader) fetchMessage() (ret []any, err error) { return ret, nil } -func (r *xstreamReader) FetchMessage() (msg xstreamMessage, err error) { +func (r *xstreamReader) FetchMessage(ctx context.Context) (msg Message, err error) { // func (r *xstreamReader) FetchMessage(ctx context.Context) (msg xstreamMessage, err error) { // Step 1. get result from redis xstream var ret, qmsg, msgs []any = nil, nil, nil @@ -289,10 +307,10 @@ func (r *xstreamReader) FetchMessage() (msg xstreamMessage, err error) { return msg, nil } -func (r *xstreamReader) CommitMessages(msg xstreamMessage) (err error) { +func (r *xstreamReader) CommitMessages(ctx context.Context, msg Message) (err error) { args := []interface{}{ - r.cfg.stream, // key - r.cfg.group, // group + r.cfg.Stream, // key + r.cfg.Group, // group msg.Id, // id } @@ -319,11 +337,11 @@ func (r *xstreamReader) redisDo(cmd string, args ...any) (reply any, err error) } func (r *xstreamReader) initGroup() error { - groups, err := r.redisDo("XINfO", "GROUPS", r.cfg.stream) + groups, err := r.redisDo("XINfO", "GROUPS", r.cfg.Stream) groupInfosInf, _ := groups.([]interface{}) if err != nil || len(groupInfosInf) == 0 { - log.Infof("create group, name: %s", r.cfg.group) - _, err = r.redisDo("XGROUP", "CREATE", r.cfg.stream, r.cfg.group, "$", "MKSTREAM") + log.Infof("create group, name: %s", r.cfg.Group) + _, err = r.redisDo("XGROUP", "CREATE", r.cfg.Stream, r.cfg.Group, "$", "MKSTREAM") return err } @@ -331,11 +349,11 @@ func (r *xstreamReader) initGroup() error { groupInfoInf := groupInfo.([]interface{}) groupName := string(groupInfoInf[1].([]uint8)) log.Infof("group name: %s", groupName) - if groupName == r.cfg.group { + if groupName == r.cfg.Group { return nil } } - log.Infof("create group, name: %s", r.cfg.group) - _, err = r.redisDo("XGROUP", "CREATE", r.cfg.stream, r.cfg.group, "$", "MKSTREAM") + log.Infof("create group, name: %s", r.cfg.Group) + _, err = r.redisDo("XGROUP", "CREATE", r.cfg.Stream, r.cfg.Group, "$", "MKSTREAM") return err }