## 在ubuntu 16.04上安装go的zmq4
在用 `go get -v github.com/pebbe/zmq4` 进报错
```
zmq4 Package libzmq was not found in the pkg-config search path.
Perhaps you should add the directory containing `libzmq.pc' to the PKG_CONFIG_PATH environment variable
```
说明缺少zmq的依赖. 因为zmq4是使用c库封装, 然后用cgo编译的. 所以要先安装zmq的c库
安装可参考: https://askubuntu.com/questions/918140/how-to-install-0mq-on-ubuntu-16-04/928188
```
wget https://github.com/zeromq/libzmq/releases/download/v4.2.2/zeromq-4.2.2.tar.gz
cd zeromq-4.2.2/
tar -xvzf zeromq-4.2.2.tar.gz
sudo apt-get install libtool pkg-config build-essential autoconf automake uuid-dev
sudo apt-get install checkinstall
./configure
make
sudo checkinstall
sudo ldconfig
```
然后在执行
```
go get -v -u github.com/pebbe/zmq4
```
即可以安装好zmq4库了
## zmq消息模式分析
### request-reply 模式
同步请求应答模式, 发送request以后必须等待reply才能继续发送请求.
### publish-subscribe模式
pub-sub是一组异步模型, publisher向所有的subscriber push 消息
subscriber可以订阅多种消息, subscriber会收到任何匹配的订阅
subscriber可以过滤订阅的消息
subscriber可以向多个publisher订阅
这里的发布与订阅角色是绝对的,
即发布者无法使用recv,
订阅者不能使用send
按照官网的说法,在这种模式下很可能发布者刚启动时发布的数据出现丢失,原因是用zmq
发送速度太快 在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的)。
### pipeline模式
push-pull是一组异步模型
进行并行任务处理
## 发布订阅demo
```
//
// Clone server Model One
//
package main
import (
zmq "github.com/pebbe/zmq4"
"fmt"
"time"
)
type Tick struct {
Symbol string
Price int
}
func (this *Tick) Send(socket *zmq.Socket) (err error) {
//fmt.Printf("Send to %s: %q\n", socket, kvmsg.frame)
_, err = socket.SendMessage(this, 666)
return
}
func recvtick(socket *zmq.Socket) (onetick Tick, err error) {
info, err := socket.RecvMessage(0)
if err != nil {
fmt.Println("err in recv", err)
return
}
for _, v := range info {
fmt.Println(v)
}
return
}
func publish() {
// Prepare our context and publisher socket
publisher, _ := zmq.NewSocket(zmq.PUB)
publisher.Bind("tcp://*:5556")
sequence := int64(1)
for ; true; sequence++ {
// Distribute as key-value message
tick := Tick{"szse", 200}
err := tick.Send(publisher)
fmt.Println("publish", tick)
if err != nil {
break
}
}
fmt.Printf("Interrupted\n%d messages out\n", sequence)
}
func subscribe() {
subscriber, _ := zmq.NewSocket(zmq.SUB)
subscriber.SetRcvhwm(100000) // or messages between snapshot and next are lost
subscriber.SetSubscribe("")
subscriber.Connect("tcp://localhost:5556")
time.Sleep(time.Second) // or messages between snapshot and next are lost
// Now apply pending updates, discard out-of-sequence messages
for {
_, err := recvtick(subscriber)
if err != nil {
fmt.Println("sub recv err", err)
break // Interrupted
}
}
}
func main() {
fmt.Println()
fmt.Println("pub")
go publish()
fmt.Println("sub")
subscribe()
}
```
## 注意事项
版本一定要对, 无论是c库版本 还是go库版本
## zmp 封装
```
package zmqutil
import (
"time"
zmq "github.com/pebbe/zmq4"
"go.uber.org/zap"
)
type (
ZmqSubscriber struct {
SubAddress string
SubFilter string
HighWaterMark int
RecvTimeout time.Duration
HeartbeatTime time.Time
HeartbeatInterval time.Duration
HeartbeatCheckInterval time.Duration
ReconnectInterval time.Duration
Parser Parser
}
Parser interface {
Parse(data [][]byte)
}
)
func NewZmqSubscriber(subAddress string, subFilter string, highWaterMark int,
recvTimeout, heartbeatInterval, heartbeatCheckInterval, reconectInterval time.Duration, parser Parser) *ZmqSubscriber {
return &ZmqSubscriber{
SubAddress: subAddress,
SubFilter: subFilter,
HighWaterMark: highWaterMark,
RecvTimeout: recvTimeout,
HeartbeatInterval: heartbeatInterval,
HeartbeatCheckInterval: heartbeatCheckInterval,
ReconnectInterval: reconectInterval,
Parser: parser,
}
}
func (this *ZmqSubscriber) Run() {
subscriber, _ := zmq.NewSocket(zmq.SUB)
defer subscriber.Close()
subscriber.SetRcvhwm(this.HighWaterMark)
subscriber.SetSubscribe(this.SubFilter)
// connect to service
for {
if err := subscriber.Connect(this.SubAddress); err != nil {
logger.Warn("Connect to subscribe address failed.",
zap.Error(err),
zap.String("address", this.SubAddress),
zap.Duration("wait to reconnect", this.ReconnectInterval))
time.Sleep(this.ReconnectInterval)
} else {
this.HeartbeatTime = time.Now()
break
}
}
nextHeartbeatCheck := time.Now().Add(this.HeartbeatCheckInterval)
for {
data, err := subscriber.RecvMessageBytes(zmq.DONTWAIT)
if err != nil {
time.Sleep(this.RecvTimeout)
} else if len(data) != 2 {
logger.Warn("Got broken message", zap.ByteStrings("data", data))
} else {
// parse && send to channel
this.Parser.Parse(data)
// update heartbeat timestamp
this.HeartbeatTime = time.Now()
}
now := time.Now()
if nextHeartbeatCheck.Before(now) {
nextHeartbeatCheck = nextHeartbeatCheck.Add(this.HeartbeatCheckInterval)
if now.Sub(this.HeartbeatTime).Seconds() > (this.HeartbeatInterval.Seconds() + 1) {
logger.Warn("Heartbeat timeout",
zap.Time("last heartbeat time", this.HeartbeatTime),
zap.String("try to reconnect to", this.SubAddress))
// reinit
go this.Run()
return
}
}
}
}
```
- 命令行库cobra
- 用户路径检测go-homedir
- 配置解决方案viper(cobra配置用)
- 高效结构化日志库zap
- RPC框架grpc
- mongdb操作mgo
- ORM库xorm
- GRPCrest接口grpcgateway
- 使用gogoproto时grpcgateway的protobuf和json转换方法
- sync.Map
- zmq
- gogoproto
- go类型转换和类型断言
- go select用法详解以及定时器
- go并发资源竞争
- 官方命令行库flag
- 配置文件解析器 robig/config
- interface {} 接口
- goroutine && channel
- go 命名
- 类型switch
- 数据
- 初始化
- 指针方法 && 值方法
- 内嵌
- mqtt go实现
- grpc middleware