企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
## 在ubuntu 16.04上安装go的zmq4 在用 `go get -v github.com/pebbe/zmq4` 进报错 ``` zmq4 Package libzmq was not found in the pkg-config search path. Perhaps you should add the directory containing `libzmq.pc' to the PKG_CONFIG_PATH environment variable ``` 说明缺少zmq的依赖. 因为zmq4是使用c库封装, 然后用cgo编译的. 所以要先安装zmq的c库 安装可参考: https://askubuntu.com/questions/918140/how-to-install-0mq-on-ubuntu-16-04/928188 ``` wget https://github.com/zeromq/libzmq/releases/download/v4.2.2/zeromq-4.2.2.tar.gz cd zeromq-4.2.2/ tar -xvzf zeromq-4.2.2.tar.gz sudo apt-get install libtool pkg-config build-essential autoconf automake uuid-dev sudo apt-get install checkinstall ./configure make sudo checkinstall sudo ldconfig ``` 然后在执行 ``` go get -v -u github.com/pebbe/zmq4 ``` 即可以安装好zmq4库了 ## zmq消息模式分析 ### request-reply 模式 同步请求应答模式, 发送request以后必须等待reply才能继续发送请求. ### publish-subscribe模式 pub-sub是一组异步模型, publisher向所有的subscriber push 消息 subscriber可以订阅多种消息, subscriber会收到任何匹配的订阅 subscriber可以过滤订阅的消息 subscriber可以向多个publisher订阅 这里的发布与订阅角色是绝对的, 即发布者无法使用recv, 订阅者不能使用send 按照官网的说法,在这种模式下很可能发布者刚启动时发布的数据出现丢失,原因是用zmq 发送速度太快 在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的)。     ### pipeline模式 push-pull是一组异步模型 进行并行任务处理 ## 发布订阅demo ``` // // Clone server Model One // package main import ( zmq "github.com/pebbe/zmq4" "fmt" "time" ) type Tick struct { Symbol string Price int } func (this *Tick) Send(socket *zmq.Socket) (err error) { //fmt.Printf("Send to %s: %q\n", socket, kvmsg.frame) _, err = socket.SendMessage(this, 666) return } func recvtick(socket *zmq.Socket) (onetick Tick, err error) { info, err := socket.RecvMessage(0) if err != nil { fmt.Println("err in recv", err) return } for _, v := range info { fmt.Println(v) } return } func publish() { // Prepare our context and publisher socket publisher, _ := zmq.NewSocket(zmq.PUB) publisher.Bind("tcp://*:5556") sequence := int64(1) for ; true; sequence++ { // Distribute as key-value message tick := Tick{"szse", 200} err := tick.Send(publisher) fmt.Println("publish", tick) if err != nil { break } } fmt.Printf("Interrupted\n%d messages out\n", sequence) } func subscribe() { subscriber, _ := zmq.NewSocket(zmq.SUB) subscriber.SetRcvhwm(100000) // or messages between snapshot and next are lost subscriber.SetSubscribe("") subscriber.Connect("tcp://localhost:5556") time.Sleep(time.Second) // or messages between snapshot and next are lost // Now apply pending updates, discard out-of-sequence messages for { _, err := recvtick(subscriber) if err != nil { fmt.Println("sub recv err", err) break // Interrupted } } } func main() { fmt.Println() fmt.Println("pub") go publish() fmt.Println("sub") subscribe() } ``` ## 注意事项 版本一定要对, 无论是c库版本 还是go库版本 ## zmp 封装 ``` package zmqutil import ( "time" zmq "github.com/pebbe/zmq4" "go.uber.org/zap" ) type ( ZmqSubscriber struct { SubAddress string SubFilter string HighWaterMark int RecvTimeout time.Duration HeartbeatTime time.Time HeartbeatInterval time.Duration HeartbeatCheckInterval time.Duration ReconnectInterval time.Duration Parser Parser } Parser interface { Parse(data [][]byte) } ) func NewZmqSubscriber(subAddress string, subFilter string, highWaterMark int, recvTimeout, heartbeatInterval, heartbeatCheckInterval, reconectInterval time.Duration, parser Parser) *ZmqSubscriber { return &ZmqSubscriber{ SubAddress: subAddress, SubFilter: subFilter, HighWaterMark: highWaterMark, RecvTimeout: recvTimeout, HeartbeatInterval: heartbeatInterval, HeartbeatCheckInterval: heartbeatCheckInterval, ReconnectInterval: reconectInterval, Parser: parser, } } func (this *ZmqSubscriber) Run() { subscriber, _ := zmq.NewSocket(zmq.SUB) defer subscriber.Close() subscriber.SetRcvhwm(this.HighWaterMark) subscriber.SetSubscribe(this.SubFilter) // connect to service for { if err := subscriber.Connect(this.SubAddress); err != nil { logger.Warn("Connect to subscribe address failed.", zap.Error(err), zap.String("address", this.SubAddress), zap.Duration("wait to reconnect", this.ReconnectInterval)) time.Sleep(this.ReconnectInterval) } else { this.HeartbeatTime = time.Now() break } } nextHeartbeatCheck := time.Now().Add(this.HeartbeatCheckInterval) for { data, err := subscriber.RecvMessageBytes(zmq.DONTWAIT) if err != nil { time.Sleep(this.RecvTimeout) } else if len(data) != 2 { logger.Warn("Got broken message", zap.ByteStrings("data", data)) } else { // parse && send to channel this.Parser.Parse(data) // update heartbeat timestamp this.HeartbeatTime = time.Now() } now := time.Now() if nextHeartbeatCheck.Before(now) { nextHeartbeatCheck = nextHeartbeatCheck.Add(this.HeartbeatCheckInterval) if now.Sub(this.HeartbeatTime).Seconds() > (this.HeartbeatInterval.Seconds() + 1) { logger.Warn("Heartbeat timeout", zap.Time("last heartbeat time", this.HeartbeatTime), zap.String("try to reconnect to", this.SubAddress)) // reinit go this.Run() return } } } } ```