From ec06dd12340920825a1a400b2a3f284ad6da6c8e Mon Sep 17 00:00:00 2001 From: bryanqiu Date: Fri, 28 Jun 2024 18:19:17 +0800 Subject: [PATCH] first commit --- go.mod | 14 ++ logging/logging.go | 162 +++++++++++++++ redis.go | 25 +++ redis/redis.go | 98 +++++++++ redis_test.go | 65 ++++++ sentinel/sentinel.go | 460 +++++++++++++++++++++++++++++++++++++++++++ xstream/reader.go | 372 ++++++++++++++++++++++++++++++++++ xstream/writer.go | 20 ++ 8 files changed, 1216 insertions(+) create mode 100644 go.mod create mode 100644 logging/logging.go create mode 100644 redis.go create mode 100644 redis/redis.go create mode 100644 redis_test.go create mode 100644 sentinel/sentinel.go create mode 100644 xstream/reader.go create mode 100644 xstream/writer.go diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..630d130 --- /dev/null +++ b/go.mod @@ -0,0 +1,14 @@ +module qoobing.com/gomod/redis + +go 1.18 + +require ( + github.com/gomodule/redigo v1.9.2 + qoobing.com/gomod/log v1.2.8 + qoobing.com/gomod/str v1.0.5 +) + +require ( + github.com/tylerb/gls v0.0.0-20150407001822-e606233f194d // indirect + github.com/tylerb/is v2.1.4+incompatible // indirect +) diff --git a/logging/logging.go b/logging/logging.go new file mode 100644 index 0000000..a9e1e3c --- /dev/null +++ b/logging/logging.go @@ -0,0 +1,162 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package logging + +import ( + "bytes" + "context" + "fmt" + "log" + "time" + + "github.com/gomodule/redigo/redis" +) + +var ( + _ redis.ConnWithTimeout = (*loggingConn)(nil) + logCallDepth = 3 +) + +// NewLoggingConn returns a logging wrapper around a connection. +func NewLoggingConn(conn redis.Conn, logger *log.Logger, prefix string) redis.Conn { + if prefix != "" { + prefix = prefix + "." + } + return &loggingConn{conn, logger, prefix, nil} +} + +// NewLoggingConnFilter returns a logging wrapper around a connection and a filter function. +func NewLoggingConnFilter(conn redis.Conn, logger *log.Logger, prefix string, skip func(cmdName string) bool) redis.Conn { + if prefix != "" { + prefix = prefix + "." + } + return &loggingConn{conn, logger, prefix, skip} +} + +type loggingConn struct { + redis.Conn + logger *log.Logger + prefix string + skip func(cmdName string) bool +} + +func (c *loggingConn) Close() error { + err := c.Conn.Close() + var buf bytes.Buffer + fmt.Fprintf(&buf, "%sClose() -> (%v)", c.prefix, err) + c.logger.Output(logCallDepth, buf.String()) // nolint: errcheck + return err +} + +func (c *loggingConn) printValue(buf *bytes.Buffer, v interface{}) { + const chop = 128 + switch v := v.(type) { + case []byte: + if len(v) > chop { + fmt.Fprintf(buf, "%q...", v[:chop]) + } else { + fmt.Fprintf(buf, "%q", v) + } + case string: + if len(v) > chop { + fmt.Fprintf(buf, "%q...", v[:chop]) + } else { + fmt.Fprintf(buf, "%q", v) + } + case []interface{}: + if len(v) == 0 { + buf.WriteString("[]") + } else { + sep := "[" + fin := "]" + if len(v) > chop { + v = v[:chop] + fin = "...]" + } + for _, vv := range v { + buf.WriteString(sep) + c.printValue(buf, vv) + sep = ", " + } + buf.WriteString(fin) + } + default: + fmt.Fprint(buf, v) + } +} + +func (c *loggingConn) print(method, commandName string, args []interface{}, reply interface{}, err error) { + if c.skip != nil && c.skip(commandName) { + return + } + var buf bytes.Buffer + fmt.Fprintf(&buf, "%s%s(", c.prefix, method) + if method != "Receive" { + buf.WriteString(commandName) + for _, arg := range args { + buf.WriteString(", ") + c.printValue(&buf, arg) + } + } + buf.WriteString(") -> (") + if method != "Send" { + c.printValue(&buf, reply) + buf.WriteString(", ") + } + fmt.Fprintf(&buf, "%v)", err) + c.logger.Output(logCallDepth+1, buf.String()) // nolint: errcheck +} + +func (c *loggingConn) Do(commandName string, args ...interface{}) (interface{}, error) { + reply, err := c.Conn.Do(commandName, args...) + c.print("Do", commandName, args, reply, err) + return reply, err +} + +func (c *loggingConn) DoContext(ctx context.Context, commandName string, args ...interface{}) (interface{}, error) { + reply, err := redis.DoContext(c.Conn, ctx, commandName, args...) + c.print("DoContext", commandName, args, reply, err) + return reply, err +} + +func (c *loggingConn) DoWithTimeout(timeout time.Duration, commandName string, args ...interface{}) (interface{}, error) { + reply, err := redis.DoWithTimeout(c.Conn, timeout, commandName, args...) + c.print("DoWithTimeout", commandName, args, reply, err) + return reply, err +} + +func (c *loggingConn) Send(commandName string, args ...interface{}) error { + err := c.Conn.Send(commandName, args...) + c.print("Send", commandName, args, nil, err) + return err +} + +func (c *loggingConn) Receive() (interface{}, error) { + reply, err := c.Conn.Receive() + c.print("Receive", "", nil, reply, err) + return reply, err +} + +func (c *loggingConn) ReceiveContext(ctx context.Context) (interface{}, error) { + reply, err := redis.ReceiveContext(c.Conn, ctx) + c.print("ReceiveContext", "", nil, reply, err) + return reply, err +} + +func (c *loggingConn) ReceiveWithTimeout(timeout time.Duration) (interface{}, error) { + reply, err := redis.ReceiveWithTimeout(c.Conn, timeout) + c.print("ReceiveWithTimeout", "", nil, reply, err) + return reply, err +} diff --git a/redis.go b/redis.go new file mode 100644 index 0000000..20808a7 --- /dev/null +++ b/redis.go @@ -0,0 +1,25 @@ +package redis + +import ( + redigo "github.com/gomodule/redigo/redis" + "qoobing.com/gomod/redis/redis" + "qoobing.com/gomod/redis/sentinel" +) + +type ( + Conn = redigo.Conn + Pool = redigo.Pool + Config = sentinel.Config +) + +var NewRedisPool = redis.NewPool +var NewSentinelPool = sentinel.NewPool + +func NewPool(cfg Config) *redigo.Pool { + if cfg.Master != "" { + return NewRedisPool(cfg) + } else if cfg.MasterName != "" { + return NewSentinelPool(cfg) + } + panic("invalid config: Master & MasterName are both empty") +} diff --git a/redis/redis.go b/redis/redis.go new file mode 100644 index 0000000..8f660e8 --- /dev/null +++ b/redis/redis.go @@ -0,0 +1,98 @@ +package redis + +import ( + "fmt" + "log" + "os" + "time" + + "github.com/gomodule/redigo/redis" + "qoobing.com/gomod/redis/logging" + "qoobing.com/gomod/redis/sentinel" +) + +type Config = sentinel.Config + +func NewPool(cfg Config) *redis.Pool { + if cfg.Master == "" { + panic("config is invalid: Master is empty") + } else if cfg.Master != "" && cfg.MasterName != "" { + //panic("config is invalid: Master & MasterName must not set both") + } + var ( + masterAddr = cfg.Master + masterUsername = cfg.Username + masterPassword = cfg.Password + ) + if cfg.Wait == nil { + cfg.Wait = new(bool) + *cfg.Wait = true + } + + if cfg.IdleTimeout == nil { + cfg.IdleTimeout = new(int) + *cfg.IdleTimeout = 300 + } else if *cfg.IdleTimeout <= 0 { + *cfg.IdleTimeout = 86400 * 365 * 10 + } + + if cfg.MaxIdle == nil { + cfg.MaxIdle = new(int) + *cfg.MaxIdle = 100 + } else if *cfg.MaxIdle < 0 { + *cfg.MaxIdle = 100 + } + + if cfg.MaxActive == nil { + cfg.MaxActive = new(int) + *cfg.MaxActive = 100 + } else if *cfg.MaxActive < 0 { + *cfg.MaxActive = 100 + } + + var ( + logPrefix = "redis" + logStdPrefix = "DBUG " + logStdWriter = os.Stdout + logStdFlags = log.Ldate | log.Lmicroseconds | log.Lshortfile + logStdLogger = log.New(logStdWriter, logStdPrefix, logStdFlags) + ) + return &redis.Pool{ + MaxIdle: *cfg.MaxIdle, + MaxActive: *cfg.MaxActive, + Wait: *cfg.Wait, + IdleTimeout: time.Duration(*cfg.IdleTimeout) * time.Second, + Dial: func() (redis.Conn, error) { + c, err := redis.Dial("tcp", masterAddr) + if err != nil { + return nil, err + } + + var okstr = "OK" + if masterPassword != "" && masterUsername != "" { + okstr, err = redis.String(c.Do("AUTH", masterUsername, masterPassword)) + } else if masterPassword != "" { + okstr, err = redis.String(c.Do("AUTH", masterPassword)) + } + + if err != nil { + return nil, fmt.Errorf("redis master AUTH failed: <%s>", err.Error()) + } else if okstr != "OK" { + return nil, fmt.Errorf("redis master AUTH failed: <%s>", okstr) + } + + //// if !TestRole(c, "master") { + //// c.Close() + //// err = fmt.Errorf( + //// "master(%s) got by name '%s' is not redis master", + //// masterAddr, masterName) + //// return nil, err + //// } + + if cfg.Debug { + c = logging.NewLoggingConn(c, logStdLogger, logPrefix) + } + return c, nil + }, + } +} diff --git a/redis_test.go b/redis_test.go new file mode 100644 index 0000000..1a7155c --- /dev/null +++ b/redis_test.go @@ -0,0 +1,65 @@ +package redis + +import ( + "fmt" + "log" + "testing" + + "qoobing.com/gomod/redis/sentinel" + "qoobing.com/gomod/redis/xstream" +) + +func TestNewConsumer(t *testing.T) { + cfg := xstream.Config{ + Group: "group_a", + Stream: "testxstream_queue", + Sentinel: sentinel.Config{ + Debug: true, //调试开关(会在日志打印REDIS语句) + MasterName: "walletmaster", //REDIS主名称(一个哨兵集群可管理多个REDIS主从结构) + Username: "web3wallet", //REDIS连接用户名 + Password: "web3onthemoon@2023#dev", //REDIS连接用户密码 + Sentinels: "sentinel-0.redis:5000", //哨兵节点列表,逗号分隔,一般配置三个 + SentinelUsername: "sentinel", //哨兵节点连接用户名,不填默认default + SentinelPassword: "password@mhxzkhl#dev", //哨兵节点连接密码,不填认为集群无密码 + }, + } + + for i := 0; i < 15; i++ { + i := i + name := fmt.Sprintf("name-%d", i) + t.Run(name, func(t *testing.T) { + t.Parallel() + log.Println("name", name) + if r := xstream.NewReader(cfg); r == nil { + t.Error("create new reader failed") + } + }) + } +} + +func TestNewPool(t *testing.T) { + cfg := Config{ + Debug: true, //调试开关(会在日志打印REDIS语句) + Master: "redis-0.redis:2379", //REDIS主 + Username: "web3wallet", //REDIS连接用户名 + Password: "web3onthemoon@2023#dev", //REDIS连接用户密码 + Sentinels: "sentinel-0.redis:5000", //哨兵节点列表,逗号分隔,一般配置三个 + SentinelUsername: "sentinel", //哨兵节点连接用户名,不填默认default + SentinelPassword: "password@mhxzkhl#dev", //哨兵节点连接密码,不填认为集群无密码 + } + pool := NewPool(cfg) + if pool == nil { + t.Errorf("new pool failed") + } + rds := pool.Get() + if rds == nil { + t.Errorf("get redis failed") + } + if ret, err := rds.Do("SET", "ABC", "123"); err != nil { + t.Errorf("get redis failed: %s", err) + } else { + fmt.Println("ABC", ret) + } + fmt.Println(rds.Do("GET", "ABC")) + +} diff --git a/sentinel/sentinel.go b/sentinel/sentinel.go new file mode 100644 index 0000000..e09ed97 --- /dev/null +++ b/sentinel/sentinel.go @@ -0,0 +1,460 @@ +package sentinel + +import ( + "errors" + "fmt" + "log" + "net" + "os" + "strings" + "sync" + "time" + + "github.com/gomodule/redigo/redis" + "qoobing.com/gomod/redis/logging" +) + +type Config struct { + Debug bool `toml:"debug"` //调试开关(会在日志打印REDIS语句) + Master string `toml:"master"` //REDIS主host&port 与 MasterName 互斥 + MasterName string `toml:"mastername"` //REDIS主名称,值不为空是启动sentinal模式, 与Master互斥 + Username string `toml:"username"` //REDIS连接用户名 + Password string `toml:"password"` //REDIS连接用户密码 + Sentinels string `toml:"sentinels"` //哨兵节点列表,逗号分隔,一般配置三个 + SentinelUsername string `toml:"sentinel_username"` //哨兵节点连接用户名,不填默认default + SentinelPassword string `toml:"sentinel_password"` //哨兵节点连接密码,不填认为集群无密码 + Wait *bool `toml:"wait"` //当无可用连接时,是否等待 + MaxIdle *int `toml:"max_idle"` //最大空闲连接数 + MaxActive *int `toml:"max_active"` //最大活跃连接数 + IdleTimeout *int `toml:"idle_timeout"` //空闲超时时间 +} + +type Sentinel struct { + // Addrs is a slice with known Sentinel addresses. + Addrs []string + + // MasterName is a name of Redis master Sentinel servers monitor. + MasterName string + + // Dial is a user supplied function to connect to Sentinel on given address. This + // address will be chosen from Addrs slice. + // Note that as per the redis-sentinel client guidelines, a timeout is mandatory + // while connecting to Sentinels, and should not be set to 0. + Dial func(addr string) (redis.Conn, error) + + // Pool is a user supplied function returning custom connection pool to Sentinel. + // This can be useful to tune options if you are not satisfied with what default + // Sentinel pool offers. See defaultPool() method for default pool implementation. + // In most cases you only need to provide Dial function and let this be nil. + Pool func(addr string) *redis.Pool + + mu sync.RWMutex + pools map[string]*redis.Pool + addr string +} + +func NewPool(cfg Config) *redis.Pool { + if cfg.MasterName == "" { + panic("config is invalid: MasterName is empty") + } else if cfg.Master != "" && cfg.MasterName != "" { + //panic("config is invalid: Master & MasterName must not set both") + } + var ( + masterName = cfg.MasterName + masterUsername = cfg.Username + masterPassword = cfg.Password + sentinelUsername = cfg.SentinelUsername + sentinelPassword = cfg.SentinelPassword + sntnl = &Sentinel{ + Addrs: strings.Split(cfg.Sentinels, ","), + MasterName: masterName, + Dial: func(addr string) (redis.Conn, error) { + timeout := 1000 * time.Millisecond + c, err := redis.DialTimeout("tcp", addr, timeout, timeout, timeout) + if err != nil { + return nil, err + } + var okstr = "OK" + if sentinelPassword != "" && sentinelUsername != "" { + okstr, err = redis.String(c.Do("AUTH", sentinelUsername, sentinelPassword)) + } else if sentinelPassword != "" { + okstr, err = redis.String(c.Do("AUTH", sentinelPassword)) + } + + if err != nil { + return nil, fmt.Errorf("redis sentinel AUTH failed: <%s>", err.Error()) + } else if okstr != "OK" { + return nil, fmt.Errorf("redis sentinel AUTH failed: <%s>", okstr) + } + return c, nil + }, + } + ) + if cfg.Wait == nil { + cfg.Wait = new(bool) + *cfg.Wait = true + } + if cfg.IdleTimeout == nil { + cfg.IdleTimeout = new(int) + *cfg.IdleTimeout = 300 + } else if *cfg.IdleTimeout <= 0 { + *cfg.IdleTimeout = 86400 * 365 * 10 + } + + if cfg.MaxIdle == nil { + cfg.MaxIdle = new(int) + *cfg.MaxIdle = 100 + } else if *cfg.MaxIdle < 0 { + *cfg.MaxIdle = 100 + } + + if cfg.MaxActive == nil { + cfg.MaxActive = new(int) + *cfg.MaxActive = 100 + } else if *cfg.MaxActive < 0 { + *cfg.MaxActive = 100 + } + + var ( + logPrefix = "redis" + logStdPrefix = "DBUG " + logStdWriter = os.Stdout + logStdFlags = log.Ldate | log.Lmicroseconds | log.Lshortfile + logStdLogger = log.New(logStdWriter, logStdPrefix, logStdFlags) + ) + return &redis.Pool{ + MaxIdle: *cfg.MaxIdle, + MaxActive: *cfg.MaxActive, + Wait: *cfg.Wait, + IdleTimeout: time.Duration(*cfg.IdleTimeout) * time.Second, + Dial: func() (redis.Conn, error) { + masterAddr, err := sntnl.MasterAddr() + if err != nil { + return nil, err + } + + c, err := redis.Dial("tcp", masterAddr) + if err != nil { + return nil, err + } + + var okstr = "OK" + if masterPassword != "" && masterUsername != "" { + okstr, err = redis.String(c.Do("AUTH", masterUsername, masterPassword)) + } else if masterPassword != "" { + okstr, err = redis.String(c.Do("AUTH", masterPassword)) + } + + if err != nil { + return nil, fmt.Errorf("redis master AUTH failed: <%s>", err.Error()) + } else if okstr != "OK" { + return nil, fmt.Errorf("redis master AUTH failed: <%s>", okstr) + } + + if !TestRole(c, "master") { + c.Close() + err = fmt.Errorf( + "master(%s) got by name '%s' is not redis master", + masterAddr, masterName) + return nil, err + } + + if cfg.Debug { + c = logging.NewLoggingConn(c, logStdLogger, logPrefix) + } + return c, nil + }, + } +} + +// NoSentinelsAvailable is returned when all sentinels in the list are exhausted +// (or none configured), and contains the last error returned by Dial (which +// may be nil) +type NoSentinelsAvailable struct { + lastError error +} + +func (ns NoSentinelsAvailable) Error() string { + if ns.lastError != nil { + return fmt.Sprintf("redigo: no sentinels available; last error: %s", ns.lastError.Error()) + } + return fmt.Sprintf("redigo: no sentinels available") +} + +// putToTop puts Sentinel address to the top of address list - this means +// that all next requests will use Sentinel on this address first. +// +// From Sentinel guidelines: +// +// The first Sentinel replying to the client request should be put at the +// start of the list, so that at the next reconnection, we'll try first +// the Sentinel that was reachable in the previous connection attempt, +// minimizing latency. +func (s *Sentinel) putToTop(addr string) { + s.mu.Lock() + defer s.mu.Unlock() + for i, a := range s.Addrs { + if a == addr { + s.Addrs[0], s.Addrs[i] = s.Addrs[i], s.Addrs[0] + break + } + } +} + +// putToBottom puts Sentinel address to the bottom of address list. +// We call this method internally when see that some Sentinel failed to answer +// on application request so next time we start with another one. +func (s *Sentinel) putToBottom(addr string) { + s.mu.Lock() + defer s.mu.Unlock() + for i, a := range s.Addrs { + if a == addr { + copy(s.Addrs[i:], s.Addrs[i+1:]) + s.Addrs[len(s.Addrs)-1] = a + break + } + } +} + +// defaultPool returns a connection pool to one Sentinel. This allows +// us to call concurrent requests to Sentinel using connection Do method. +func (s *Sentinel) defaultPool(addr string) *redis.Pool { + return &redis.Pool{ + MaxIdle: 3, + MaxActive: 10, + Wait: true, + IdleTimeout: 240 * time.Second, + Dial: func() (redis.Conn, error) { + return s.Dial(addr) + }, + TestOnBorrow: func(c redis.Conn, t time.Time) error { + _, err := c.Do("PING") + return err + }, + } +} + +func (s *Sentinel) poolForAddr(addr string) *redis.Pool { + s.mu.Lock() + defer s.mu.Unlock() + if pool, ok := s.pools[addr]; ok { + return pool + } + s.mu.Unlock() + newPool := s.newPool(addr) + s.mu.Lock() + if pool, ok := s.pools[addr]; ok { + return pool + } + if s.pools == nil { + s.pools = make(map[string]*redis.Pool) + } + s.pools[addr] = newPool + return newPool +} + +func (s *Sentinel) newPool(addr string) *redis.Pool { + if s.Pool != nil { + return s.Pool(addr) + } + return s.defaultPool(addr) +} + +// close connection pool to Sentinel. +// Lock must be hold by caller. +func (s *Sentinel) close() { + for _, pool := range s.pools { + pool.Close() + } + s.pools = nil +} + +func (s *Sentinel) doUntilSuccess(f func(redis.Conn) (interface{}, error)) (interface{}, error) { + s.mu.RLock() + addrs := s.Addrs + s.mu.RUnlock() + + var lastErr error + + for _, addr := range addrs { + conn := s.poolForAddr(addr).Get() + reply, err := f(conn) + conn.Close() + if err != nil { + lastErr = err + s.putToBottom(addr) + continue + } + s.putToTop(addr) + return reply, nil + } + + return nil, NoSentinelsAvailable{lastError: lastErr} +} + +// MasterAddr returns an address of current Redis master instance. +func (s *Sentinel) MasterAddr() (string, error) { + return redis.String(s.doUntilSuccess(func(c redis.Conn) (interface{}, error) { + return queryForMaster(c, s.MasterName) + })) +} + +// SlaveAddrs returns a slice with known slave addresses of current master instance. +func (s *Sentinel) SlaveAddrs() ([]string, error) { + return redis.Strings(s.doUntilSuccess(func(c redis.Conn) (interface{}, error) { + return queryForSlaveAddrs(c, s.MasterName) + })) +} + +// Slave represents a Redis slave instance which is known by Sentinel. +type Slave struct { + ip string + port string + flags string +} + +// Addr returns an address of slave. +func (s *Slave) Addr() string { + return net.JoinHostPort(s.ip, s.port) +} + +// Available returns if slave is in working state at moment based on information in slave flags. +func (s *Slave) Available() bool { + return !strings.Contains(s.flags, "disconnected") && !strings.Contains(s.flags, "s_down") +} + +// Slaves returns a slice with known slaves of master instance. +func (s *Sentinel) Slaves() ([]*Slave, error) { + res, err := s.doUntilSuccess(func(c redis.Conn) (interface{}, error) { + return queryForSlaves(c, s.MasterName) + }) + if err != nil { + return nil, err + } + return res.([]*Slave), nil +} + +// SentinelAddrs returns a slice of known Sentinel addresses Sentinel server aware of. +func (s *Sentinel) SentinelAddrs() ([]string, error) { + return redis.Strings(s.doUntilSuccess(func(c redis.Conn) (interface{}, error) { + return queryForSentinels(c, s.MasterName) + })) +} + +// Discover allows to update list of known Sentinel addresses. From docs: +// +// A client may update its internal list of Sentinel nodes following this procedure: +// 1) Obtain a list of other Sentinels for this master using the command SENTINEL sentinels . +// 2) Add every ip:port pair not already existing in our list at the end of the list. +func (s *Sentinel) Discover() error { + addrs, err := s.SentinelAddrs() + if err != nil { + return err + } + s.mu.Lock() + for _, addr := range addrs { + if !stringInSlice(addr, s.Addrs) { + s.Addrs = append(s.Addrs, addr) + } + } + s.mu.Unlock() + return nil +} + +// Close closes current connection to Sentinel. +func (s *Sentinel) Close() error { + s.mu.Lock() + s.close() + s.mu.Unlock() + return nil +} + +// TestRole wraps GetRole in a test to verify if the role matches an expected +// role string. If there was any error in querying the supplied connection, +// the function returns false. Works with Redis >= 2.8.12. +// It's not goroutine safe, but if you call this method on pooled connections +// then you are OK. +func TestRole(c redis.Conn, expectedRole string) bool { + role, err := getRole(c) + return err == nil && role == expectedRole +} + +// getRole is a convenience function supplied to query an instance (master or +// slave) for its role. It attempts to use the ROLE command introduced in +// redis 2.8.12. +func getRole(c redis.Conn) (string, error) { + res, err := c.Do("ROLE") + if err != nil { + return "", err + } + rres, ok := res.([]interface{}) + if ok { + return redis.String(rres[0], nil) + } + return "", errors.New("redigo: can not transform ROLE reply to string") +} + +func queryForMaster(conn redis.Conn, masterName string) (string, error) { + res, err := redis.Strings(conn.Do("SENTINEL", "get-master-addr-by-name", masterName)) + if err != nil { + return "", err + } + if len(res) < 2 { + return "", errors.New("redigo: malformed get-master-addr-by-name reply") + } + masterAddr := net.JoinHostPort(res[0], res[1]) + return masterAddr, nil +} + +func queryForSlaveAddrs(conn redis.Conn, masterName string) ([]string, error) { + slaves, err := queryForSlaves(conn, masterName) + if err != nil { + return nil, err + } + slaveAddrs := make([]string, len(slaves)) + for i, slave := range slaves { + slaveAddrs[i] = slave.Addr() + } + return slaveAddrs, nil +} + +func queryForSlaves(conn redis.Conn, masterName string) ([]*Slave, error) { + res, err := redis.Values(conn.Do("SENTINEL", "slaves", masterName)) + if err != nil { + return nil, err + } + slaves := make([]*Slave, len(res)) + for i, a := range res { + sm, err := redis.StringMap(a, err) + if err != nil { + return nil, err + } + slaves[i] = &Slave{ip: sm["ip"], port: sm["port"], flags: sm["flags"]} + } + return slaves, nil +} + +func queryForSentinels(conn redis.Conn, masterName string) ([]string, error) { + res, err := redis.Values(conn.Do("SENTINEL", "sentinels", masterName)) + if err != nil { + return nil, err + } + sentinels := make([]string, len(res)) + for i, a := range res { + sm, err := redis.StringMap(a, err) + if err != nil { + return nil, err + } + sentinels[i] = fmt.Sprintf("%s:%s", sm["ip"], sm["port"]) + } + return sentinels, nil +} + +func stringInSlice(str string, slice []string) bool { + for _, s := range slice { + if s == str { + return true + } + } + return false +} diff --git a/xstream/reader.go b/xstream/reader.go new file mode 100644 index 0000000..daeed17 --- /dev/null +++ b/xstream/reader.go @@ -0,0 +1,372 @@ +package xstream + +import ( + "context" + "errors" + "fmt" + "net" + "time" + + "github.com/gomodule/redigo/redis" + "qoobing.com/gomod/log" + "qoobing.com/gomod/redis/sentinel" + "qoobing.com/gomod/str" +) + +////////////////////////////////////////////////////////////////////////////////// +///////////////////////// example ////////////////////////////////////////////// +////////////////////////////////////////////////////////////////////////////////// +////// const ( +////// CORESERVER_NEWACCOUNT_GROUP = "groupid-coreserver-newaccount" +////// CORESERVER_NEWACCOUNT_QUEUE = "coreserver.newaccount" +////// ) +////// +////// type NewAccountMessage struct { +////// Sign string `json:"sign"` +////// Appid string `json:"appid"` +////// Userid int64 `json:"userid"` +////// Account string `json:"account"` +////// CreateTime time.Time `json:"createtime"` +////// } +////// +////// func Run(ctx context.Context) { +////// var SLEEP_TIME = 10 * time.Second +////// go func() { +////// for { +////// log.Infof("start run newaccount marketing engine ...") +////// runNewAccountMarketingEngine(ctx) +////// log.Infof("newaccount marketing engine exit unexpected, retry %s later again", SLEEP_TIME.String()) +////// time.Sleep(SLEEP_TIME) +////// } +////// }() +////// } +////// +////// func runNewAccountMarketingEngine(ctx context.Context) { +////// // r read message from redis +////// cfg := config.Instance() +////// r := xstream.NewReader(xstream.Config{ +////// Group: CORESERVER_NEWACCOUNT_GROUP, +////// Stream: CORESERVER_NEWACCOUNT_QUEUE, +////// Sentinel: cfg.CoreserverRedis, +////// }) +////// defer r.Close() +////// +////// // w write message to kafka +////// w := &kafka.Writer{ +////// Addr: kafka.TCP(cfg.MarketingKafka.Addresses...), +////// Balancer: &kafka.LeastBytes{}, +////// } +////// defer w.Close() +////// +////// var error_wait_time = 0 * time.Millisecond +////// for { +////// // Step 0. wait a moment if need +////// if error_wait_time > 0 { +////// time.Sleep(error_wait_time) +////// } +////// +////// // Step 1. fetch message +////// var err error = nil +////// var msg xstream.Message +////// var newacctMsg NewAccountMessage +////// log.Infof("waiting fetch newaccount message from redis queue...") +////// if msg, err = r.FetchMessage(ctx); err != nil { +////// log.Errorf("failed fetch message from redis queue, err:%s", err) +////// //error_wait_time = 1000 * time.Millisecond +////// break +////// } +////// +////// // Step 2. decode message +////// if err := json.Unmarshal(msg.Value, &newacctMsg); err != nil { +////// log.Errorf("failed json.unmarshal message[%s], err:%s", string(msg.Value), err) +////// continue +////// } else if newacctMsg.Userid == 0 || newacctMsg.Account == "" { +////// log.Errorf("invalid newaccount message, userid=[%d]", newacctMsg.Userid) +////// continue +////// } +////// log.Infof("fetch newaccount message success, account=[%s]", newacctMsg.Account) +////// +////// // Step 3. handle message +////// // xxxxxxx handle message xxxxxxxxxxxxxxxxxxxxxxxx +////// // xxxxxxx handle message xxxxxxxxxxxxxxxxxxxxxxxx +////// // xxxxxxx handle message xxxxxxxxxxxxxxxxxxxxxxxx +////// // xxxxxxx handle message xxxxxxxxxxxxxxxxxxxxxxxx +////// // xxxxxxx handle message xxxxxxxxxxxxxxxxxxxxxxxx +////// // Step 4. send ack to coreserver redis +////// if err := r.CommitMessages(ctx, msg); err != nil { +////// log.Errorf("failed commit message to redis, err:%s", err) +////// error_wait_time = 1000 * time.Millisecond +////// continue +////// } +////// } +////// } +///////////////////////////// end example //////////////////////////////////////////// + +var ( + redisPools = map[string]*redis.Pool{} +) + +type Config struct { + Group string `toml:"group"` + Stream string `toml:"stream"` + Sentinel sentinel.Config `toml:"sentinel"` +} + +type Message struct { + Id string + Value []byte +} + +type xstreamReader struct { + cfg Config + id string + name string + conn redis.Conn + connPool *redis.Pool + closed bool + restart bool +} + +func NewReader(cfg Config) (r *xstreamReader) { + // redis pool + poolId := cfg.Sentinel.Sentinels + if _, ok := redisPools[poolId]; !ok { + redisPools[poolId] = sentinel.NewPool(cfg.Sentinel) + } + + // initialize + connPool := redisPools[poolId] + r = &xstreamReader{ + cfg: cfg, + conn: connPool.Get(), + connPool: connPool, + closed: false, + restart: true, + } + r.id = str.GetRandomNumString(12) + r.name = newName(r.conn, r.id, cfg.Group, cfg.Stream) + if err := r.initGroup(); err != nil { + panic(fmt.Sprintf("initGroup failed: error:%s", err)) + } + + // heart beat + go func() { + beatconn := redisPools[poolId].Get() + defer beatconn.Close() + for !r.closed { + id, err := redis.String(beatconn.Do("GETEX", r.name, "EX", 120)) + if err != nil { + log.Warningf("redis GETEX failed, err:%s", err) + r.Close() + } else if id != r.id { + log.Warningf("name id in redis (%s) not equal to local name id(%s)", id, r.id) + r.Close() + } + time.Sleep(15 * time.Second) + } + }() + + return r +} + +func newName(conn redis.Conn, id, group, xstream string) string { + var ( + mkeys = []any{} + prefix = fmt.Sprintf("xstream[%s]-group[%s]-", xstream, group) + consumer = func(i int) string { return fmt.Sprintf("%s%03d", prefix, i) } + consumers = map[string]string{} + MAX_CONSUMER_NUM = 30 + ) + + // get all name + for i := 0; i < MAX_CONSUMER_NUM; i++ { + mkeys = append(mkeys, consumer(i)) + } + ret, err := redis.Strings(conn.Do("MGET", mkeys...)) + if err != nil { + log.Warningf("redis MGET error: %s", err) + panic("redis MGET error") + } + for i, s := range ret { + if s != "" { + name := consumer(i) + consumers[name] = s + } + } + + var ( + i = 0 + retry = 0 + consumer_i = "" + ) +LOOP: + // get unused name + for ; i < MAX_CONSUMER_NUM; i++ { + if _, ok := consumers[consumer(i)]; !ok { + consumer_i = consumer(i) + break + } + } + if i == MAX_CONSUMER_NUM { + panic(fmt.Sprintf("too many consumer(more than %d)???", MAX_CONSUMER_NUM)) + } + + // set to redis (try lock) + success, err := redis.Int(conn.Do("SETNX", consumer_i, id)) + conn.Do("GETEX", consumer_i, "EX", 120) /* for SETNX failed forever */ + if err != nil && retry >= 2 { + log.Warningf("redis SETNX(%s,%s) error: %s", consumer_i, id, err) + panic(fmt.Sprintf("redis SETNX error after try [%d] times", retry)) + } + + // return or retry if something error + if err != nil /*&& retry < 2*/ { + retry++ + log.Warningf("redis SETNX(%s,%s) error: %s, retry(%d)", consumer_i, id, err, retry) + goto LOOP + } + if success == 0 { + i++ + retry++ + log.Warningf("consumer name(%s) used by others, retry(%d)", consumer_i, retry) + goto LOOP + } + log.Infof("success to get new consumer-name:[%s]", consumer_i) + return consumer_i +} + +func (r *xstreamReader) Close() { + // delete client consumer name + id, err := redis.String(r.redisDo("GETEX", r.name, "EX", 120)) + if err == nil && id == r.id { + r.redisDo("DEL", r.name) + } + + // clonse redis connection(put to pool) + r.closed = true + r.conn.Close() +} + +func (r *xstreamReader) fetchMessage() (ret []any, err error) { + // func (r *xstreamReader) fetchMessage(ctx context.Context) (ret []any, err error) { + args := []interface{}{ + "GROUP", r.cfg.Group, r.name, + "COUNT", 1, + "BLOCK", 864000000, + "STREAMS", r.cfg.Stream, + } + if r.restart { + args = append(args, "0-0") + } else { + args = append(args, ">") + } + + ret, err = redis.Values(r.redisDo("XREADGROUP", args...)) + + if err != nil { + return ret, err + } else if ret == nil { + return ret, nil + } + if len(ret) != 1 { + panic("len(ret) is not 1") + } + return ret, nil +} + +func (r *xstreamReader) FetchMessage(ctx context.Context) (msg Message, err error) { + // func (r *xstreamReader) FetchMessage(ctx context.Context) (msg xstreamMessage, err error) { + // Step 1. get result from redis xstream + var ret, qmsg, msgs []any = nil, nil, nil + for { + ret, err = r.fetchMessage() + ////////////////////////////////////////////////////////////////////// + ////ret format example: + //// [["coreserver.txneedrepair", [["1675180029481-0", ["a", "b"]]]]] + ////////////////////////////////////////////////////////////////////// + if err != nil { + log.Warningf("fetchMessage meassage err:%s", err) + return msg, errors.New("fetchMessage meassage err") + } + if len(ret) == 0 { + log.Warningf("redis stream result length is 0, which shoud bigger than 0") + return msg, errors.New("redis fetchMessage ret length is 0") + } + + qmsg, _ = redis.Values(ret[0], nil) // ["coreserver.txneedrepair", [["1675180029481-0", ["a", "b"]]]] + msgs, _ = redis.Values(qmsg[1], nil) // [["1675180029481-0", ["a", "b"]]] + + if len(msgs) == 0 && r.restart { + r.restart = false + continue + } + if len(msgs) != 1 { + log.Warningf("meassage length from xstream is not 1(but %d)", len(msgs)) + return msg, errors.New("no new msg") + } + break + } + + // Step 2. parse xstream item result + var ( + amsg, _ = redis.Values(msgs[0], nil) // ["1675180029481-0", ["a", "b"]] + rmsg, _ = redis.Values(amsg[1], nil) // ["a", "b"] + msgid, _ = redis.String(amsg[0], nil) // "1675180029481-0" + msgval, _ = redis.String(rmsg[1], nil) // "b" + ) + msg.Id = msgid + msg.Value = []byte(msgval) + log.Infof("success fetched meassage '%s': %s", msg.Id, msgval) + return msg, nil +} + +func (r *xstreamReader) CommitMessages(ctx context.Context, msg Message) (err error) { + args := []interface{}{ + r.cfg.Stream, // key + r.cfg.Group, // group + msg.Id, // id + } + + if cnt, err := redis.Int(r.redisDo("XACK", args...)); err != nil { + return err + } else if cnt != 1 { + log.Warningf("attention: 'XACK' return %d (not 1), but we trade as success", cnt) + } + return nil +} + +func (r *xstreamReader) redisDo(cmd string, args ...any) (reply any, err error) { + if r.closed { + return nil, net.ErrClosed + } else if r.conn.Err() != nil { + log.Warningf("redis connection closed/unavaliable, we will get new one from pool") + oldconn := r.conn + newconn := r.connPool.Get() + r.conn = newconn + oldconn.Close() + } + + return r.conn.Do(cmd, args...) +} + +func (r *xstreamReader) initGroup() error { + groups, err := r.redisDo("XINfO", "GROUPS", r.cfg.Stream) + groupInfosInf, _ := groups.([]interface{}) + if err != nil || len(groupInfosInf) == 0 { + log.Infof("create group, name: %s", r.cfg.Group) + _, err = r.redisDo("XGROUP", "CREATE", r.cfg.Stream, r.cfg.Group, "$", "MKSTREAM") + return err + } + + for _, groupInfo := range groupInfosInf { + groupInfoInf := groupInfo.([]interface{}) + groupName := string(groupInfoInf[1].([]uint8)) + log.Infof("group name: %s", groupName) + if groupName == r.cfg.Group { + return nil + } + } + log.Infof("create group, name: %s", r.cfg.Group) + _, err = r.redisDo("XGROUP", "CREATE", r.cfg.Stream, r.cfg.Group, "$", "MKSTREAM") + return err +} diff --git a/xstream/writer.go b/xstream/writer.go new file mode 100644 index 0000000..9905a82 --- /dev/null +++ b/xstream/writer.go @@ -0,0 +1,20 @@ +package xstream + +//// func Example() { +//// var ( +//// m = model.NewModelDefault() +//// queue = "coreserver.newaccount" +//// msgval = map[string]interface{}{ +//// "sign": "", +//// "appid": c.AppId, +//// "userid": c.UserId, +//// "account": input.Address, +//// } +//// msgstr, _ = json.Marshal(msgval) +//// ) +//// defer m.Close() +//// +//// if _, err = m.Redis.Do("XADD", queue, "*", "msg", msgstr); err != nil { +//// log.Warningf("!!!!FAILED add newaccount to redis queue, msg is:'%s'", msgstr) +//// } +//// }