NIUCLOUD是一款SaaS管理后台框架多应用插件+云编译。上千名开发者、服务商正在积极拥抱开发者生态。欢迎开发者们免费入驻。一起助力发展! 广告
### 服务质量 默认情况下,生产者发送的消息当匹配到队列中,而此时有消费者.会把所有的消息一次性的打入消费者(如果有多个消费者会平均分配)的本地缓存中的.就算引入了AckOK情况也是如此.当消息数量巨大的时候,那么久会产生oom,内存会爆掉. 还有一种情况是,有两个消费者,一个处理的速度较快,一个处理的速度较慢.如果按上面的方式去分配消息的话,会造成一个消费者很多将消息处理完毕,然后进入空闲状态,而另一个消费者计算压力很大. 这样也会增加总的消费耗时. ### 证明 证明当队列发现消费者后将所有的消息全部打入消费者客户端. 可以发现,生产者生产完数据后,web管理界面没有需要处理的消息,说明此时消息已经全部被发送给了消费者.而消费者并没有一次性全部处理完这些消息,因为没处理一个消息会停止1S. 生产者: ~~~ conn, err := amqp.Dial("amqp://guest:guest@localhost:5672") defer conn.Close() if err != nil { fmt.Println(err) return } channel, err := conn.Channel() defer channel.Close() if err != nil { fmt.Println(err) return } channel.ExchangeDeclare("myExchange", amqp.ExchangeDirect, true, false, false, false, nil) channel.QueueDeclare("myQueue", true, false, false, false, nil) channel.QueueBind("myQueue", "test", "myExchange", false, nil) for i := 0; i < 100; i++ { channel.Publish("myExchange", "test", false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(strconv.Itoa(i)), }) fmt.Printf("发送: %d\n", i) } ~~~ 消费者: ~~~ conn, err := amqp.Dial("amqp://jack:111111@localhost:5672") if err != nil { fmt.Println(err) } ch, err := conn.Channel() if err != nil { fmt.Println(err) } ch.QueueDeclare("myQueue", true, false, false, false, nil) b := make(chan bool) msg, err := ch.Consume("myQueue", "", true, false, false, false, nil) for m := range msg { fmt.Printf("消费: %s\n", string(m.Body)) time.Sleep(time.Second) } <-b ~~~ ![](https://img.kancloud.cn/ce/91/ce9173fbb6d24888f1c05bed38306aa3_1542x312.png) ### 使用qos 修改消费者: ~~~ conn, err := amqp.Dial("amqp://jack:111111@localhost:5672") if err != nil { fmt.Println(err) } ch, err := conn.Channel() if err != nil { fmt.Println(err) } ch.QueueDeclare("myQueue", true, false, false, false, nil) ch.Qos(1, 0, false) //限制获取的消息数量,其中第二个参数和第三个参数rabbit没有实现,可以无视 b := make(chan bool) msg, err := ch.Consume("myQueue", "", false, false, false, false, nil) //关闭autoack for m := range msg { fmt.Printf("消费: %s\n", string(m.Body)) m.Ack(true) //手动ack time.Sleep(time.Second) } <-b ~~~ 此时我们看到队列中的消息并没有直接打入消费者,而且保留在了队列中,当队列收到消费者ack确认后,才会推送下一条消息.所以当autoack设置为true的时候,数据还是全部发送给了客户端.只不过是每次发送就是等待了一个ack确认. 需要注意的是,如果autoack设置为true,那么设置qos是无效的.所有数据还是会全部一次性的打入消费者客户端. ![](https://img.kancloud.cn/40/ee/40ee8b6eebc7e53ae9ab38c706fa54a6_1608x556.png)