diff --git a/xstream/reader.go b/xstream/reader.go index 818fa1a..3d95129 100644 --- a/xstream/reader.go +++ b/xstream/reader.go @@ -12,6 +12,78 @@ import ( "qoobing.com/gomod/str" ) +////////////////////////////////////////////////////////////////////////////////// +///////////////////////// example ////////////////////////////////////////////// +////////////////////////////////////////////////////////////////////////////////// +////// const ( +////// CORESERVER_NEWACCOUNT_GROUP = "groupid-coreserver-newaccount" +////// CORESERVER_NEWACCOUNT_QUEUE = "coreserver.newaccount" +////// ) +////// +////// type NewAccountMessage struct { +////// Sign string `json:"sign"` +////// Appid string `json:"appid"` +////// Userid int64 `json:"userid"` +////// Account string `json:"account"` +////// CreateTime time.Time `json:"createtime"` +////// } +////// +////// func runNewAccountMarketingEngine(ctx context.Context) { +////// // r read message from redis +////// cfg := config.Instance() +////// r := NewReader(xstreamConf{ +////// group: CORESERVER_NEWACCOUNT_GROUP, +////// queue: CORESERVER_NEWACCOUNT_QUEUE, +////// sentinelConfig: cfg.CoreserverRedis, +////// }) +////// defer r.Close() +////// +////// +////// var error_wait_time = 0 * time.Millisecond +////// for { +////// // Step 0. wait a moment if need +////// if error_wait_time > 0 { +////// time.Sleep(error_wait_time) +////// } +////// +////// // Step 1. fetch message +////// var err error = nil +////// var msg xstreamMessage +////// var newacctMsg NewAccountMessage +////// log.Infof("waiting fetch newaccount message from redis queue...") +////// if msg, err = r.FetchMessage(ctx); err != nil { +////// log.Errorf("failed fetch message from redis queue, err:%s", err) +////// //error_wait_time = 1000 * time.Millisecond +////// break +////// } +////// +////// // Step 2. decode message +////// if err := json.Unmarshal(msg.Value, &newacctMsg); err != nil { +////// log.Errorf("failed json.unmarshal message[%s], err:%s", string(msg.Value), err) +////// continue +////// } else if newacctMsg.Userid == 0 || newacctMsg.Account == "" { +////// log.Errorf("invalid newaccount message, userid=[%d]", newacctMsg.Userid) +////// continue +////// } +////// log.Infof("fetch newaccount message success, account=[%s]", newacctMsg.Account) +////// +////// // Step 3. handle message +////// // xxxxxxx handle message xxxxxxxxxxxxxxxxxxxxxxxx +////// // xxxxxxx handle message xxxxxxxxxxxxxxxxxxxxxxxx +////// // xxxxxxx handle message xxxxxxxxxxxxxxxxxxxxxxxx +////// // xxxxxxx handle message xxxxxxxxxxxxxxxxxxxxxxxx +////// // xxxxxxx handle message xxxxxxxxxxxxxxxxxxxxxxxx +////// +////// // Step 4. send ack to coreserver redis +////// if err := r.CommitMessages(ctx, msg); err != nil { +////// log.Errorf("failed commit message to redis, err:%s", err) +////// error_wait_time = 1000 * time.Millisecond +////// continue +////// } +////// } +////// } +///////////////////////////// end example //////////////////////////////////////////// + var ( redisPools = map[string]*redis.Pool{} )