This commit is contained in:
bryanqiu 2023-11-24 11:33:58 +08:00
parent 1e8c7645bb
commit ee6659ccb1
2 changed files with 52 additions and 34 deletions

2
go.mod
View File

@ -1,3 +1,3 @@
module qoobing.com/gomod/redis
go 1.16
go 1.18

View File

@ -1,12 +1,13 @@
package xstream
import (
"context"
"errors"
"fmt"
"net"
"time"
"github.com/garyburd/redigo/redis"
"github.com/gomodule/redigo/redis"
"qoobing.com/gomod/log"
"qoobing.com/gomod/redis/sentinel"
"qoobing.com/gomod/str"
@ -28,16 +29,34 @@ import (
////// CreateTime time.Time `json:"createtime"`
////// }
//////
////// func Run(ctx context.Context) {
////// var SLEEP_TIME = 10 * time.Second
////// go func() {
////// for {
////// log.Infof("start run newaccount marketing engine ...")
////// runNewAccountMarketingEngine(ctx)
////// log.Infof("newaccount marketing engine exit unexpected, retry %s later again", SLEEP_TIME.String())
////// time.Sleep(SLEEP_TIME)
////// }
////// }()
////// }
//////
////// func runNewAccountMarketingEngine(ctx context.Context) {
////// // r read message from redis
////// cfg := config.Instance()
////// r := NewReader(xstreamConf{
////// group: CORESERVER_NEWACCOUNT_GROUP,
////// queue: CORESERVER_NEWACCOUNT_QUEUE,
////// sentinelConfig: cfg.CoreserverRedis,
////// r := xstream.NewReader(xstream.Config{
////// Group: CORESERVER_NEWACCOUNT_GROUP,
////// Stream: CORESERVER_NEWACCOUNT_QUEUE,
////// Sentinel: cfg.CoreserverRedis,
////// })
////// defer r.Close()
//////
////// // w write message to kafka
////// w := &kafka.Writer{
////// Addr: kafka.TCP(cfg.MarketingKafka.Addresses...),
////// Balancer: &kafka.LeastBytes{},
////// }
////// defer w.Close()
//////
////// var error_wait_time = 0 * time.Millisecond
////// for {
@ -48,7 +67,7 @@ import (
//////
////// // Step 1. fetch message
////// var err error = nil
////// var msg xstreamMessage
////// var msg xstream.Message
////// var newacctMsg NewAccountMessage
////// log.Infof("waiting fetch newaccount message from redis queue...")
////// if msg, err = r.FetchMessage(ctx); err != nil {
@ -59,13 +78,13 @@ import (
//////
////// // Step 2. decode message
////// if err := json.Unmarshal(msg.Value, &newacctMsg); err != nil {
////// log.Errorf("failed json.unmarshal message[%s], err:%s", string(msg.Value), err)
////// continue
////// } else if newacctMsg.Userid == 0 || newacctMsg.Account == "" {
////// log.Errorf("invalid newaccount message, userid=[%d]", newacctMsg.Userid)
////// continue
////// }
////// log.Infof("fetch newaccount message success, account=[%s]", newacctMsg.Account)
////// log.Errorf("failed json.unmarshal message[%s], err:%s", string(msg.Value), err)
////// continue
////// } else if newacctMsg.Userid == 0 || newacctMsg.Account == "" {
////// log.Errorf("invalid newaccount message, userid=[%d]", newacctMsg.Userid)
////// continue
////// }
////// log.Infof("fetch newaccount message success, account=[%s]", newacctMsg.Account)
//////
////// // Step 3. handle message
////// // xxxxxxx handle message xxxxxxxxxxxxxxxxxxxxxxxx
@ -73,7 +92,6 @@ import (
////// // xxxxxxx handle message xxxxxxxxxxxxxxxxxxxxxxxx
////// // xxxxxxx handle message xxxxxxxxxxxxxxxxxxxxxxxx
////// // xxxxxxx handle message xxxxxxxxxxxxxxxxxxxxxxxx
//////
////// // Step 4. send ack to coreserver redis
////// if err := r.CommitMessages(ctx, msg); err != nil {
////// log.Errorf("failed commit message to redis, err:%s", err)
@ -89,12 +107,12 @@ var (
)
type Config struct {
stream string
group string
sentinelConfig sentinel.Config
Group string `toml:"group"`
Stream string `toml:"stream"`
Sentinel sentinel.Config `toml:"sentinel"`
}
type xstreamMessage struct {
type Message struct {
Id string
Value []byte
}
@ -111,9 +129,9 @@ type xstreamReader struct {
func NewReader(cfg Config) (r *xstreamReader) {
// redis pool
poolId := cfg.sentinelConfig.Sentinels
poolId := cfg.Sentinel.Sentinels
if _, ok := redisPools[poolId]; !ok {
redisPools[poolId] = sentinel.NewPool(cfg.sentinelConfig)
redisPools[poolId] = sentinel.NewPool(cfg.Sentinel)
}
// initialize
@ -126,7 +144,7 @@ func NewReader(cfg Config) (r *xstreamReader) {
restart: true,
}
r.id = str.GetRandomNumString(12)
r.name = newName(r.conn, r.id, cfg.group)
r.name = newName(r.conn, r.id, cfg.Group)
if err := r.initGroup(); err != nil {
panic(fmt.Sprintf("initGroup failed: error:%s", err))
}
@ -219,10 +237,10 @@ func (r *xstreamReader) Close() {
func (r *xstreamReader) fetchMessage() (ret []any, err error) {
// func (r *xstreamReader) fetchMessage(ctx context.Context) (ret []any, err error) {
args := []interface{}{
"GROUP", r.cfg.group, r.name,
"GROUP", r.cfg.Group, r.name,
"COUNT", 1,
"BLOCK", 864000000,
"STREAMS", r.cfg.stream,
"STREAMS", r.cfg.Stream,
}
if r.restart {
args = append(args, "0-0")
@ -243,7 +261,7 @@ func (r *xstreamReader) fetchMessage() (ret []any, err error) {
return ret, nil
}
func (r *xstreamReader) FetchMessage() (msg xstreamMessage, err error) {
func (r *xstreamReader) FetchMessage(ctx context.Context) (msg Message, err error) {
// func (r *xstreamReader) FetchMessage(ctx context.Context) (msg xstreamMessage, err error) {
// Step 1. get result from redis xstream
var ret, qmsg, msgs []any = nil, nil, nil
@ -289,10 +307,10 @@ func (r *xstreamReader) FetchMessage() (msg xstreamMessage, err error) {
return msg, nil
}
func (r *xstreamReader) CommitMessages(msg xstreamMessage) (err error) {
func (r *xstreamReader) CommitMessages(ctx context.Context, msg Message) (err error) {
args := []interface{}{
r.cfg.stream, // key
r.cfg.group, // group
r.cfg.Stream, // key
r.cfg.Group, // group
msg.Id, // id
}
@ -319,11 +337,11 @@ func (r *xstreamReader) redisDo(cmd string, args ...any) (reply any, err error)
}
func (r *xstreamReader) initGroup() error {
groups, err := r.redisDo("XINfO", "GROUPS", r.cfg.stream)
groups, err := r.redisDo("XINfO", "GROUPS", r.cfg.Stream)
groupInfosInf, _ := groups.([]interface{})
if err != nil || len(groupInfosInf) == 0 {
log.Infof("create group, name: %s", r.cfg.group)
_, err = r.redisDo("XGROUP", "CREATE", r.cfg.stream, r.cfg.group, "$", "MKSTREAM")
log.Infof("create group, name: %s", r.cfg.Group)
_, err = r.redisDo("XGROUP", "CREATE", r.cfg.Stream, r.cfg.Group, "$", "MKSTREAM")
return err
}
@ -331,11 +349,11 @@ func (r *xstreamReader) initGroup() error {
groupInfoInf := groupInfo.([]interface{})
groupName := string(groupInfoInf[1].([]uint8))
log.Infof("group name: %s", groupName)
if groupName == r.cfg.group {
if groupName == r.cfg.Group {
return nil
}
}
log.Infof("create group, name: %s", r.cfg.group)
_, err = r.redisDo("XGROUP", "CREATE", r.cfg.stream, r.cfg.group, "$", "MKSTREAM")
log.Infof("create group, name: %s", r.cfg.Group)
_, err = r.redisDo("XGROUP", "CREATE", r.cfg.Stream, r.cfg.Group, "$", "MKSTREAM")
return err
}