### 服务质量
默认情况下,生产者发送的消息当匹配到队列中,而此时有消费者.会把所有的消息一次性的打入消费者(如果有多个消费者会平均分配)的本地缓存中的.就算引入了AckOK情况也是如此.当消息数量巨大的时候,那么久会产生oom,内存会爆掉.
还有一种情况是,有两个消费者,一个处理的速度较快,一个处理的速度较慢.如果按上面的方式去分配消息的话,会造成一个消费者很多将消息处理完毕,然后进入空闲状态,而另一个消费者计算压力很大. 这样也会增加总的消费耗时.
### 证明
证明当队列发现消费者后将所有的消息全部打入消费者客户端. 可以发现,生产者生产完数据后,web管理界面没有需要处理的消息,说明此时消息已经全部被发送给了消费者.而消费者并没有一次性全部处理完这些消息,因为没处理一个消息会停止1S.
生产者:
~~~
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
defer conn.Close()
if err != nil {
fmt.Println(err)
return
}
channel, err := conn.Channel()
defer channel.Close()
if err != nil {
fmt.Println(err)
return
}
channel.ExchangeDeclare("myExchange", amqp.ExchangeDirect, true, false, false, false, nil)
channel.QueueDeclare("myQueue", true, false, false, false, nil)
channel.QueueBind("myQueue", "test", "myExchange", false, nil)
for i := 0; i < 100; i++ {
channel.Publish("myExchange", "test", false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte(strconv.Itoa(i)),
})
fmt.Printf("发送: %d\n", i)
}
~~~
消费者:
~~~
conn, err := amqp.Dial("amqp://jack:111111@localhost:5672")
if err != nil {
fmt.Println(err)
}
ch, err := conn.Channel()
if err != nil {
fmt.Println(err)
}
ch.QueueDeclare("myQueue", true, false, false, false, nil)
b := make(chan bool)
msg, err := ch.Consume("myQueue", "", true, false, false, false, nil)
for m := range msg {
fmt.Printf("消费: %s\n", string(m.Body))
time.Sleep(time.Second)
}
<-b
~~~

### 使用qos
修改消费者:
~~~
conn, err := amqp.Dial("amqp://jack:111111@localhost:5672")
if err != nil {
fmt.Println(err)
}
ch, err := conn.Channel()
if err != nil {
fmt.Println(err)
}
ch.QueueDeclare("myQueue", true, false, false, false, nil)
ch.Qos(1, 0, false) //限制获取的消息数量,其中第二个参数和第三个参数rabbit没有实现,可以无视
b := make(chan bool)
msg, err := ch.Consume("myQueue", "", false, false, false, false, nil) //关闭autoack
for m := range msg {
fmt.Printf("消费: %s\n", string(m.Body))
m.Ack(true) //手动ack
time.Sleep(time.Second)
}
<-b
~~~
此时我们看到队列中的消息并没有直接打入消费者,而且保留在了队列中,当队列收到消费者ack确认后,才会推送下一条消息.所以当autoack设置为true的时候,数据还是全部发送给了客户端.只不过是每次发送就是等待了一个ack确认.
需要注意的是,如果autoack设置为true,那么设置qos是无效的.所有数据还是会全部一次性的打入消费者客户端.

- 定义和特征
- 安装
- 基本概念
- 插件管理
- 核心概念
- virtual hosts
- connextion
- exchange
- channel
- queue
- binding
- 工作模式
- simple模式
- work模式
- 订阅模式
- routing模式
- topic模式
- QOS服务质量
- =====分割线=====
- RabbitMQ核心概念
- 初识RabbitMQ
- 什么是AMQP高级消息队列协议
- AMQP核心概念
- RabbitMQ整体架构模型
- 命令行与管控台操作
- RabbitMQ消息生产与消费
- RabbitMQ交换机详解
- 什么是exchange
- direct
- topic
- fanout
- headers
- RabbitMQ绑定,队列,虚拟主机,消息
- RabbitMQ高级特性
- 消息保障100%投递成功
- 幂等性概念及业界主流解决方案
- confirm确认消息
- return返回消息
- 自定义消费者
- 消费端限流策略
- 消费端ack与重回队列机制
- TTL消息
- 死信队列
- RabbitMQ集群架构
