发布订阅

使用Send,Flush和Receive方法实现Pub / Sub

c.Send("SUBSCRIBE", "example") c.Flush() for {     reply, err := c.Receive()     if err != nil {         return err     }     // process pushed message } 

PubSubConn封装Conn以实现订阅者提供简便方法。Subscribe,PSubscribe,Unsubscribe和PUnsubscribe方法发送和刷新订阅。receive方法将推送的消息转换对应的类型

psc := redis.PubSubConn{Conn: c} psc.Subscribe("example") for {     switch v := psc.Receive().(type) {     case redis.Message:         fmt.Printf("%s: message: %s\n", v.Channel, v.Data)     case redis.Subscription:         fmt.Printf("%s: %s %d\n", v.Channel, v.Kind, v.Count)     case error:         return v     } } 

PubSubConn

定义

type PubSubConn struct {     Conn Conn } 

提供的方法:

1.Close 关闭连接
func (c PubSubConn) Close() error

2.PSubscribe PSubscribe发布
func (c PubSubConn) PSubscribe(channel ...interface{}) error

3.PUnsubscribe 取消发布, 如果没有给定, 则取消所有
func (c PubSubConn) PUnsubscribe(channel ...interface{}) error

4.Ping 指定的数据向服务器发送PING 调用此方法时,连接必须至少订阅一个通道或模式
func (c PubSubConn) Ping(data string) error

5.Receive 获取消息
func (c PubSubConn) Receive() interface{}

6.ReceiveWithTimeout 带有超时时间的获取消息函数
func (c PubSubConn) ReceiveWithTimeout(timeout time.Duration) interface{}

7.Subscribe 订阅
func (c PubSubConn) Subscribe(channel ...interface{}) error

8.Unsubscribe 取消订阅
func (c PubSubConn) Unsubscribe(channel ...interface{}) error

示例程序:

package main  import (    "context"    "fmt"    "time"     "github.com/gomodule/redigo/redis" )  // listenPubSubChannels listens for messages on Redis pubsub channels. The // onStart function is called after the channels are subscribed. The onMessage // function is called for each message. func listenPubSubChannels(ctx context.Context, redisServerAddr string,    onStart func() error,    onMessage func(channel string, data []byte) error,    channels ...string) error {    // A ping is set to the server with this period to test for the health of    // the connection and server.    const healthCheckPeriod = time.Minute     c, err := redis.Dial("tcp", redisServerAddr,       // Read timeout on server should be greater than ping period.       redis.DialReadTimeout(healthCheckPeriod+10*time.Second),       redis.DialWriteTimeout(10*time.Second))    if err != nil {       return err    }    defer c.Close()     psc := redis.PubSubConn{Conn: c}     if err := psc.Subscribe(redis.Args{}.AddFlat(channels)...); err != nil {       return err    }     done := make(chan error, 1)     // Start a goroutine to receive notifications from the server.    go func() {       for {          switch n := psc.Receive().(type) {          case error:             done <- n             return          case redis.Message:             if err := onMessage(n.Channel, n.Data); err != nil {                done <- err                return             }          case redis.Subscription:             switch n.Count {             case len(channels):                // Notify application when all channels are subscribed.                if err := onStart(); err != nil {                   done <- err                   return                }             case 0:                // Return from the goroutine when all channels are unsubscribed.                done <- nil                return             }          }       }    }()     ticker := time.NewTicker(healthCheckPeriod)    defer ticker.Stop() loop:    for err == nil {       select {       case <-ticker.C:          // Send ping to test health of connection and server. If          // corresponding pong is not received, then receive on the          // connection will timeout and the receive goroutine will exit.          if err = psc.Ping(""); err != nil {             break loop          }       case <-ctx.Done():          break loop       case err := <-done:          // Return error from the receive goroutine.          return err       }    }     // Signal the receiving goroutine to exit by unsubscribing from all channels.    psc.Unsubscribe()     // Wait for goroutine to complete.    return <-done }  func publish() {    c, err := redis.Dial("tcp", "127.0.0.1:6379")    if err != nil {       fmt.Println(err)       return    }    defer c.Close()     c.Do("PUBLISH", "c1", "hello")    c.Do("PUBLISH", "c2", "world")    c.Do("PUBLISH", "c1", "goodbye") }  // This example shows how receive pubsub notifications with cancelation and // health checks. func main() {    redisServerAddr := "127.0.0.1:6379"     ctx, cancel := context.WithCancel(context.Background())     err := listenPubSubChannels(ctx,       redisServerAddr,       func() error {          // The start callback is a good place to backfill missed          // notifications. For the purpose of this example, a goroutine is          // started to send notifications.          go publish()          return nil       },       func(channel string, message []byte) error {          fmt.Printf("channel: %s, message: %s\n", channel, message)           // For the purpose of this example, cancel the listener's context          // after receiving last message sent by publish().          if string(message) == "goodbye" {             cancel()          }          return nil       },       "c1", "c2")     if err != nil {       fmt.Println(err)       return    }  } 

输出:

channel: c1, message: hello channel: c2, message: world channel: c1, message: goodbye