💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、豆包、星火、月之暗面及文生图、文生视频 广告
### 订阅模式 消息被路透投递给多个队列,一个消息被多个消费者获取. ![](https://img.kancloud.cn/16/f6/16f62df80408647238578f30d793b5cc_1021x363.png) ### 特点 1. 需要声明交换机. 2. 交换机类型为fanout. 2. 不需要routingKey. 3. 需要匿名队列.(是否需要自动删除) 4. 将匿名队列和交换机绑定. ### 实例 ~~~ const MQURL = "amqp://guest:guest@localhost:5672" type RabbitMQ struct { conn *amqp.Connection channel *amqp.Channel Exchange string //交换机名 Queue string //队列名 Key string //路由名 AMQPUrl string //连接URL } //创建rabbit实例 func New(exchange, queue, key string) *RabbitMQ { r := &RabbitMQ{ Exchange: exchange, Queue: queue, Key: key, AMQPUrl: MQURL, } var err error r.conn, err = amqp.Dial(r.AMQPUrl) // :=不能用于结构体赋值 r.FailOnErr(err, "创建Connection失败") r.channel, err = r.conn.Channel() r.FailOnErr(err, "创建channel失败") return r } //处理错误 func (r *RabbitMQ) FailOnErr(err error, msg string) { if err != nil { fmt.Printf("%s : %s\n", err, msg) return } } //创建订阅模式的实例 func NewRabbitMQPubSub(exchange string) *RabbitMQ { r := New(exchange, "", "") //队列名称为空 var err error r.conn, err = amqp.Dial(r.AMQPUrl) r.FailOnErr(err, "创建connection失败") r.channel, err = r.conn.Channel() r.FailOnErr(err, "创建channel失败") return r } //订阅模式发布 func (r *RabbitMQ) PublishPub(message string) { //尝试创建交换机 err := r.channel.ExchangeDeclare( r.Exchange, amqp.ExchangeFanout, //订阅类型的交换机是fanout true, false, false, false, nil, ) r.FailOnErr(err, "创建路由器失败") r.channel.Publish( r.Exchange, "", //key是为空的 false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(message), }) } //消费 func (r *RabbitMQ) ConsumePub() { err := r.channel.ExchangeDeclare( r.Exchange, amqp.ExchangeFanout, true, false, false, false, nil, ) if err != nil { r.FailOnErr(err, "创建交换机失败") } //创建队列 q, err := r.channel.QueueDeclare( "", //队列名称为空 true, false, false, false, nil, ) r.FailOnErr(err, "创建队列失败") r.channel.QueueBind( q.Name, //绑定随机生成的队列名称 "", //pubsub模式下,这里的key为空 r.Exchange, false, nil, ) msg, err := r.channel.Consume( q.Name, "", true, false, false, false, nil, ) forever := make(chan struct{}) go func() { for v := range msg { fmt.Println(string(v.Body)) } }() fmt.Println("等待接受消息") <-forever } ~~~ 消费端 ~~~ r :=rabbitmq.NewRabbitMQPubSub("test") r.ConsumePub() ~~~ 产生端 ~~~ r := rabbitmq.NewRabbitMQPubSub("test") for i := 0; i < 100; i++ { r.PublishPub("hello: " + strconv.Itoa(i)) } ~~~ 同样的消息,多个消费端都可以收到. ![](https://img.kancloud.cn/c0/5b/c05b370bd3ad54c72917865c5ced00f8_473x969.png) ![](https://img.kancloud.cn/41/de/41de2bc46f16c942579377ba61dc1672_449x907.png)