### 订阅模式
消息被路透投递给多个队列,一个消息被多个消费者获取.

### 特点
1. 需要声明交换机.
2. 交换机类型为fanout.
2. 不需要routingKey.
3. 需要匿名队列.(是否需要自动删除)
4. 将匿名队列和交换机绑定.
### 实例
~~~
const MQURL = "amqp://guest:guest@localhost:5672"
type RabbitMQ struct {
conn *amqp.Connection
channel *amqp.Channel
Exchange string //交换机名
Queue string //队列名
Key string //路由名
AMQPUrl string //连接URL
}
//创建rabbit实例
func New(exchange, queue, key string) *RabbitMQ {
r := &RabbitMQ{
Exchange: exchange,
Queue: queue,
Key: key,
AMQPUrl: MQURL,
}
var err error
r.conn, err = amqp.Dial(r.AMQPUrl) // :=不能用于结构体赋值
r.FailOnErr(err, "创建Connection失败")
r.channel, err = r.conn.Channel()
r.FailOnErr(err, "创建channel失败")
return r
}
//处理错误
func (r *RabbitMQ) FailOnErr(err error, msg string) {
if err != nil {
fmt.Printf("%s : %s\n", err, msg)
return
}
}
//创建订阅模式的实例
func NewRabbitMQPubSub(exchange string) *RabbitMQ {
r := New(exchange, "", "") //队列名称为空
var err error
r.conn, err = amqp.Dial(r.AMQPUrl)
r.FailOnErr(err, "创建connection失败")
r.channel, err = r.conn.Channel()
r.FailOnErr(err, "创建channel失败")
return r
}
//订阅模式发布
func (r *RabbitMQ) PublishPub(message string) {
//尝试创建交换机
err := r.channel.ExchangeDeclare(
r.Exchange,
amqp.ExchangeFanout, //订阅类型的交换机是fanout
true,
false,
false,
false,
nil,
)
r.FailOnErr(err, "创建路由器失败")
r.channel.Publish(
r.Exchange,
"", //key是为空的
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
}
//消费
func (r *RabbitMQ) ConsumePub() {
err := r.channel.ExchangeDeclare(
r.Exchange,
amqp.ExchangeFanout,
true,
false,
false,
false,
nil,
)
if err != nil {
r.FailOnErr(err, "创建交换机失败")
}
//创建队列
q, err := r.channel.QueueDeclare(
"", //队列名称为空
true,
false,
false,
false,
nil,
)
r.FailOnErr(err, "创建队列失败")
r.channel.QueueBind(
q.Name, //绑定随机生成的队列名称
"", //pubsub模式下,这里的key为空
r.Exchange,
false,
nil,
)
msg, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan struct{})
go func() {
for v := range msg {
fmt.Println(string(v.Body))
}
}()
fmt.Println("等待接受消息")
<-forever
}
~~~
消费端
~~~
r :=rabbitmq.NewRabbitMQPubSub("test")
r.ConsumePub()
~~~
产生端
~~~
r := rabbitmq.NewRabbitMQPubSub("test")
for i := 0; i < 100; i++ {
r.PublishPub("hello: " + strconv.Itoa(i))
}
~~~
同样的消息,多个消费端都可以收到.


- 定义和特征
- 安装
- 基本概念
- 插件管理
- 核心概念
- virtual hosts
- connextion
- exchange
- channel
- queue
- binding
- 工作模式
- simple模式
- work模式
- 订阅模式
- routing模式
- topic模式
- QOS服务质量
- =====分割线=====
- RabbitMQ核心概念
- 初识RabbitMQ
- 什么是AMQP高级消息队列协议
- AMQP核心概念
- RabbitMQ整体架构模型
- 命令行与管控台操作
- RabbitMQ消息生产与消费
- RabbitMQ交换机详解
- 什么是exchange
- direct
- topic
- fanout
- headers
- RabbitMQ绑定,队列,虚拟主机,消息
- RabbitMQ高级特性
- 消息保障100%投递成功
- 幂等性概念及业界主流解决方案
- confirm确认消息
- return返回消息
- 自定义消费者
- 消费端限流策略
- 消费端ack与重回队列机制
- TTL消息
- 死信队列
- RabbitMQ集群架构
