企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] # JMS JMS 即 Java 消息服务(Java Message Service)应⽤程序接⼝,是⼀个 Java 平台中关于⾯向消息中间件 (MOM)的 API,⽤于在两个应⽤程序之间,或分布式系统中发送消息,进⾏异步通信。Java 消息服务是 ⼀个与具体平台⽆关的 API,绝⼤多数 MOM 提供商都对 JMS 提供⽀持 JMS 的消息机制有 2 种模型,⼀种是 Point to Point,表现为队列的形式,发送的消息,只能被⼀个接收者取 ⾛;另⼀种是 Topic,可以被多个订阅者订阅,类似于群发 ActiveMQ 是 JMS 的⼀个实现 # 依赖 ~~~ <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> ~~~ # 配置文件 使⽤ ActiveMQ 时有两种使⽤⽅式,⼀种是使⽤独⽴安装的 ActiveMQ,在⽣产环境推荐使⽤这种;另⼀种 是使⽤基于内存 ActiveMQ ,在调试阶段建议使⽤这种⽅式。 ~~~ # 基于内存的 ActiveMQ spring.activemq.in-memory=true # 不适应连接池 spring.activemq.pool.enabled=false #独⽴安装的 ActiveMQ #spring.activemq.broker-url=tcp://192.168.0.1:61616 #spring.activemq.user=admin #spring.activemq.password=admin #spring.activemq.pool.enabled=true #spring.activemq.pool.max-connections=50 #spring.activemq.packages.trust-all=true ~~~ # 同时支持队列和广播 Spring Boot 集成 ActiveMQ 的项⽬默认只⽀持队列或者⼴播中的⼀种,通过配置项 `spring.jms.pub-sub-domain` 的值来控制,true 为⼴播模式,false 为队列模式,默认情况下⽀持队列模式。 如果需要在同⼀项⽬中既⽀持队列模式也⽀持⼴播模式,可以通过DefaultJmsListenerContainerFactory创建⾃定义的 JmsListenerContainerFactory 实例,之后在 `@JmsListener`注解中通过 containerFactory属性引 ⽤它 分别创建两个⾃定义的 JmsListenerContainerFactory 实例,通过 pubSubDomain 来控制是⽀持队列模式还 是⼴播模式 **然后在消费者接收的⽅法中,指明使⽤ containerFactory 接收消息** ~~~ package com.jdxia.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import javax.jms.ConnectionFactory; @Configuration @EnableJms public class ActiveMQConfig { @Bean("queueListenerFactory") public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(false); return factory; } @Bean("topicListenerFactory") public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(true); return factory; } } ~~~ # 队列 ## 创建队列 定义了⼀个队列 queue 命名为:neo.queue ~~~ package com.jdxia.config; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.jms.Queue; import javax.jms.Topic @Configuration // 在项⽬启动时定义.... public class MqConfig { @Bean public Queue queue() { //队列 return new ActiveMQQueue("neo.queue"); } } ~~~ ## 创建生产者 ~~~ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Component; import javax.jms.Queue; import javax.jms.Topic; //生产者 @Component public class Producer { //Spring 提供发送消息的⼯具类 @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue queue; @Autowired private Topic topic; public void sendQueue(String msg) { System.out.println("send queue msg :" + msg); //转换并发送 this.jmsMessagingTemplate.convertAndSend(this.queue, msg); } } ~~~ ## 创建消费者(多) 当有多个消费者监听⼀个队列时,消费者会⾃动均衡负载的接收消息, 并且每个消息只能有⼀个消费者所接收。 ~~~ package com.jdxia.consumer; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; @Component public class Consumer { // 监控了名为 neo.queue 的队 @JmsListener(destination = "neo.queue", containerFactory = "queueListenerFactory") public void receiveQueue(String text) { System.out.println("Consumer queue msg : " + text); } } ~~~ ~~~ package com.jdxia.consumer; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; @Component public class Consumer2 { @JmsListener(destination = "neo.queue", containerFactory = "queueListenerFactory") public void receiveQueue(String text) { System.out.println("Consumer2 msg : "+text); } } ~~~ ## 测试 ~~~ @Autowired private Producer producer; //OutputCapture 是 Spring Boot 提供的⼀个测试类,它能捕获 System.out 和 System.err 的输出,我们可以利 ⽤这个特性来判断程序中的输出是否执⾏ @Rule public OutputCapture outputCapture = new OutputCapture(); @Test public void test() { for (int i = 0; i < 100; i++) { this.producer.sendQueue("Test queue message" + i); } Thread.sleep(1000L); System.out.println(this.outputCapture.toString().contains("Test queue")); } ~~~ # 广播 ⼴播(Topic)是⼀个发送者多个消费者的模式 ## 创建广播 定义了⼀个⼴播 Topic 命名为:neo.topic ~~~ package com.jdxia.config; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.jms.Queue; import javax.jms.Topic; @Configuration // 在项⽬启动时定义.... public class MqConfig { @Bean public Topic topic() { return new ActiveMQTopic("neo.topic"); } } ~~~ ## 创建生产者 ~~~ package com.jdxia.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Component; import javax.jms.Queue; import javax.jms.Topic; //生产者 @Component public class Producer { //Spring 提供发送消息的⼯具类 @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue queue; @Autowired private Topic topic; public void sendTopic(String msg) { System.out.println("send topic msg :" + msg); this.jmsMessagingTemplate.convertAndSend(this.topic, msg); } } ~~~ ## 创建消费者 ~~~ package com.jdxia.consumer; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; @Component public class Consumer { @JmsListener(destination = "neo.topic", containerFactory = "topicListenerFactory") public void receiveTopic(String text) { System.out.println("Consumer topic msg : " + text); } } ~~~ ## 测试 ~~~ @Autowired private Producer producer; //OutputCapture 是 Spring Boot 提供的⼀个测试类,它能捕获 System.out 和 System.err 的输出,我们可以利 ⽤这个特性来判断程序中的输出是否执⾏ @Rule public OutputCapture outputCapture = new OutputCapture(); @Test public void test() throws InterruptedException { this.producer.sendTopic("Test Topic message"); Thread.sleep(1000L); } ~~~ ~~~ send topic msg :Test Topic message Consumer topic msg : Test Topic message Consumer2 topic msg : Test Topic message ~~~