多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
[TOC] # 简介 消息的消费 使用容器的方式进行消费 认识一个接口`org.springframework.amqp.rabbit.listener.MessageListenerContainer`, 其默认实现类`org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer` `MessageListenerContainer#setMessageListener`方法,接收的参数类型 `org.springframework.amqp.core.MessageListener`或者`org.springframework.amqp.rabbit.core.ChannelAwareMessageListener`接口 代码: 将`ConnectionFactory`,`RabbitTemplate`,`SimpleMessageListenerContainer`实例纳入到spring容器中进行管理 # SimpleMessageListenerContainer详解 同一个queue上有多个消费者的时候,只会有一个消费者收到消息,一般是多个消费者轮流收到消息。 `SimpleMessageListenerContainer`可以监听多个队列, `container.setQueueNames`的api接收的是一个字符串数组对象 ~~~ @Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("zhihao.debug","zhihao.error","zhihao.info"); container.setMessageListener((MessageListener) message -> { System.out.println("====接收到"+message.getMessageProperties().getConsumerQueue()+"队列的消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); }); return container; } ~~~ ## SimpleMessageListenerContainer运行时动态的添加监听队列 ~~~java @ComponentScan public class Application { public static void main(String[] args) throws Exception{ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class); SimpleMessageListenerContainer container = context.getBean(SimpleMessageListenerContainer.class); TimeUnit.SECONDS.sleep(20); container.addQueueNames("zhihao.error"); TimeUnit.SECONDS.sleep(20); container.addQueueNames("zhihao.debug"); TimeUnit.SECONDS.sleep(20); context.close(); } } ~~~ SimpleMessageListenerContainer纳入容器 ~~~ @Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("zhihao.debug"); container.setMessageListener((MessageListener) message -> { if("zhihao.debug".equals(message.getMessageProperties().getConsumerQueue())){ System.out.println("====接收到"+message.getMessageProperties().getConsumerQueue()+"队列的消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); }else if("zhihao.error".equals(message.getMessageProperties().getConsumerQueue())){ System.out.println("====接收到"+message.getMessageProperties().getConsumerQueue()+"队列的消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); }else if("zhihao.info".equals(message.getMessageProperties().getConsumerQueue())){ System.out.println("====接收到"+message.getMessageProperties().getConsumerQueue()+"队列的消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); } }); return container; } ~~~ ## 运行时动态的移除监听队列 SimpleMessageListenerContainer运行时后动态的移除监听队列 ~~~ container.removeQueueNames("zhihao.debug"); ~~~ ## 后置处理器 SimpleMessageListenerContainer增加后置处理 ~~~csharp @Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("zhihao.miao.order"); //后置处理器,接收到的消息都添加了Header请求头 container.setAfterReceivePostProcessors(message -> { message.getMessageProperties().getHeaders().put("desc",10); return message; }); container.setMessageListener((MessageListener) message -> { System.out.println("====接收到消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); }); return container; } ~~~ 应用启动类: ~~~java @ComponentScan public class Application { public static void main(String[] args) throws Exception{ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class); SimpleMessageListenerContainer container = context.getBean(SimpleMessageListenerContainer.class); System.out.println(container.getQueueNames()[0]); TimeUnit.SECONDS.sleep(30); context.close(); } } ~~~ 控制台打印: ~~~ ====接收到消息===== MessageProperties [headers={desc=10}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=zhihao.miao.order, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-2xCE8upxgGgf-u1haCwt6A, consumerQueue=zhihao.miao.order] 消息2 ~~~ `setAfterReceivePostProcessors`方法可以对消息进行后置处理。 ## 设置消费者的Consumer\_tag和Arguments ~~~ int count=0; @Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("zhihao.miao.order"); //设置消费者的consumerTag_tag container.setConsumerTagStrategy(queue -> "order_queue_"+(++count)); //设置消费者的Arguments Map<String, Object> args = new HashMap<>(); args.put("module","订单模块"); args.put("fun","发送消息"); container.setConsumerArguments(args); container.setMessageListener((MessageListener) message -> { System.out.println("====接收到消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); }); return container; } ~~~ ![](https://img.kancloud.cn/49/17/491715fe924a56c2d9d963db2d0e5077_657x466.png) `container.setConsumerTagStrategy`可以设置消费者的 `Consumer_tag`, `container.setConsumerArguments`可以设置消费者的 `Arguments` ## setConcurrentConsumers设置并发消费者 ~~~ @Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("zhihao.miao.order"); container.setConcurrentConsumers(5); container.setMaxConcurrentConsumers(10); container.setMessageListener((MessageListener) message -> { System.out.println("====接收到消息====="); System.out.println(message.getMessageProperties()); System.out.println(new String(message.getBody())); }); return container; } ~~~ ![](https://img.kancloud.cn/df/e4/dfe455e66a63a776a6df0b864836e3e7_407x348.png) 应用启动类, ~~~java @ComponentScan public class Application { public static void main(String[] args) throws Exception{ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class); SimpleMessageListenerContainer container = context.getBean(SimpleMessageListenerContainer.class); container.setConcurrentConsumers(7); TimeUnit.SECONDS.sleep(30); context.close(); } } ~~~ ![](https://img.kancloud.cn/7f/b5/7fb5a056eb03ff8ccac8623b9a2744bd_473x403.png) `setConcurrentConsumers`设置多个并发消费者一起消费,并支持运行时动态修改。`setMaxConcurrentConsumers`设置最多的并发消费者。