企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[JMS简介与ActiveMQ实战](http://boy00fly.iteye.com/blog/1103586) **1\. JMS架构** Java 消息服务(Java Message Service,简称JMS)是用于访问企业消息系统的开发商中立的API。企业消息系统可以协助应用软件通过网络进行消息交互。JMS 在其中扮演的角色与JDBC 很相似,正如JDBC 提供了一套用于访问各种不同关系数据库的公共API,JMS 也提供了独立于特定厂商的企业消息系统访问方式。 使用JMS 的应用程序被称为JMS 客户端,处理消息路由与传递的消息系统被称为JMS Provider,而JMS 应用则是由多个JMS 客户端和一个JMS Provider 构成的业务系统。发送消息的JMS 客户端被称为生产者(producer),而接收消息的JMS 客户端则被称为消费者(consumer)。同一JMS 客户端既可以是生产者也可以是消费者。 JMS 的编程过程很简单,概括为:应用程序A 发送一条消息到消息服务器(也就是JMS Provider)的某个目得地(Destination),然后消息服务器把消息转发给应用程序B。因为应用程序A 和应用程序B 没有直接的代码关连,所以两者实现了解偶。如下图: ![](http://dl.iteye.com/upload/attachment/503551/5e5693f1-7798-3a24-bf00-81132f1114fc.jpg) 消息传递系统的中心就是消息。一条Message 由三个部分组成: **消息的组成** 1. 头(head) 每条JMS 消息都必须具有消息头。头字段包含用于路由和识别消息的值。可以通过多种方式来设置消息头的值: a. 由JMS 提供者在生成或传送消息的过程中自动设置 b. 由生产者客户机通过在创建消息生产者时指定的设置进行设置 c. 由生产者客户机逐一对各条消息进行设置 2. 属性(property) 消息可以包含称作属性的可选头字段。他们是以属性名和属性值对的形式制定的。可以将属性是为消息头得扩展,其中可以包括如下信息:创建数据的进程、数据的创建时间以及每条数据的结构。JMS提供者也可以添加影响消息处理的属性,如是否应压缩消息或如何在消息生命周期结束时废弃消息。 3. 主体(body) 包含要发送给接收应用程序的内容。每个消息接口特定于它所支持的内容类型。JMS为不同类型的内容提供了他们各自的消息类型,但是所有消息都派生自Message接口。 StreamMessage   一种主体中包含Java基元值流的消息。其填充和读取均按顺序进行。 MapMessage     一种主体中包含一组键--值对的消息。没有定义条目顺序。 TextMessage       一种主体中包含Java字符串的消息(例如,XML消息)。 ObjectMessage    一种主体中包含序列化Java对象的消息。 BytesMessage     一种主体中包含连续字节流的消息。 例如:MapMessage 消息格式 Json代码  ![收藏代码](http://boy00fly.iteye.com/images/icon_star.png) 1. MapMessage={   2.     Header={   3.         ... standard headers ...   4.         CorrelationID={123-00001}   5.     }   6.     Properties={   7.         AccountID={Integer:1234}   8.     }   9.     Fields={   10.         Name={String:Mark}   11.         Age={Integer:47}   12.     }    13. }   **消息的传递模型** JMS支持两种消息传递模型:点对点(point-to-point,简称PTP)和发布/订阅(publish/subscribe,简称pub/sub)。这两种消息传递模型非常相似,但有以下区别: a. PTP消息传递模型规定了一条消息之恩能够传递费一个接收方。 b. Pub/sub消息传递模型允许一条消息传递给多个接收方 每个模型都通过扩展公用基类来实现。例如:javax.jms.Queue和Javax.jms.Topic都扩展自javax.jms.Destination类。 1. 点对点消息传递 通过点对点的消息传递模型,一个应用程序可以向另外一个应用程序发送消息。在此传递模型中,目标类型时队列。消息首先被传送至队列目标,然后从改对垒将消息传送至对此队列进行监听的某个消费者,如下图: ![](http://dl.iteye.com/upload/attachment/503555/9e565035-b8bd-3697-9c15-81c2b33364b0.jpg) 一个队列可以关联多个队列发送方和接收方,但一条消息仅传递给一个接收方。如果多个接收方正在监听队列上的消息,JMS Provider将根据“先来者优先”的原则确定由哪个价售房接受下一条消息。如果没有接收方在监听队列,消息将保留在队列中,直至接收方连接到队列为止。这种消息传递模型是传统意义上的拉模型或轮询模型。在此列模型中,消息不时自动推动给客户端的,而是要由客户端从队列中请求获得。 2. 发布/订阅消息传递 通过发布/订阅消息传递模型,应用程序能够将一条消息发送到多个接收方。在此传送模型中,目标类型是主题。消息首先被传送至主题目标,然后传送至所有已订阅此主题的或送消费者。如下图: ![](http://dl.iteye.com/upload/attachment/503557/666b62ef-c254-3214-862b-8db4a137e360.jpg) 主题目标也支持长期订阅。长期订阅表示消费者已注册了主题目标,但在消息到达目标时改消费者可以处于非活动状态。当消费者再次处于活动状态时,将会接收该消息。如果消费者均没有注册某个主题目标,该主题只保留注册了长期订阅的非活动消费者的消息。与PTP消息传递模型不同,pub/sub消息传递模型允许多个主题订阅者接收同一条消息。JMS一直保留消息,直至所有主题订阅者都接收到消息为止。pub/sub消息传递模型基本上时一个推模型。在该模型中,消息会自动广播,消费者无须通过主动请求或轮询主题的方法来获得新的消息。 上面两种消息传递模型里,我们都需要定义消息生产者和消费者,生产者吧消息发送到JMS Provider的某个目标地址(Destination),消息从该目标地址传送至消费者。消费者可以同步或异步接收消息,一般而言,异步消息消费者的执行和伸缩性都优于同步消息接收者,体现在: 1\. 异步消息接收者创建的网络流量比较小。单向对东消息,并使之通过管道进入消息监听器。管道操作支持将多条消息聚合为一个网络调用。 2\. 异步消息接收者使用线程比较少。异步消息接收者在不活动期间不使用线程。同步消息接收者在接收调用期间内使用线程,结果线程可能会长时间保持空闲,尤其是如果该调用中指定了阻塞超时。 3.对于服务器上运行的应用程序代码,使用异步消息接收者几乎总是最佳选择,尤其是通过消息驱动Bean。使用异步消息接收者可以防止应用程序代码在服务器上执行阻塞操作。而阻塞操作会是服务器端线程空闲,甚至会导致死锁。阻塞操作使用所有线程时则发生死锁。如果没有空余的线程可以处理阻塞操作自身解锁所需的操作,这该操作永远无法停止阻塞。 **2\. JMS Provider(ActiveMQ)** **特性及优势** 1\. 实现JMS1.1规范,支持J2EE1.4以上。 2\. 可运行与任何JVM和大部分web容器(ActiveMQ works great in any JVM) 3\. 支持多种语言客户端(java, C, C++, Ajax, ActionScript等等) 4\. 支持多种协议(stomp, openwire, REST) 5\. 良好的Spring支持(ActiveMQ has great Spring Support) 6\. 速度很快,JBossMQ的十倍(ActiveMQ is very fast; often 10x faster than JBossMQ) 7\. 与OpenJMS、JBossMQ等开源jms provider相比,ActiveMQ有apache的支持,持续发展的优势明显 **Queue与Topic的比较** 1. JMS Queue执行load balancer语义     一条消息仅能被一个consumer收到。如果在message发送的时候没有可用的consumer,那么它讲被保存一直到能处理该message的consumer可用。如果一个consumer收到一条message后却不响应它,那么这条消息将被转到另外一个consumer那儿。一个Queue可以有很多consumer,并且在多个可用的consumer中负载均衡。 2. Topic实现publish和subscribe语义     一条消息被publish时,他将发送给所有感兴趣的订阅者,所以零到多个subscriber将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的subscriber能够获得消息的一个拷贝。 3. 分别对应两种消息模式     Point-to-Point(点对点),Publisher/Subscriber Model(发布/订阅者)     其中在Publicher/Subscriber模式下又有Nondurable subscription(非持久化订阅)和durable subscription(持久化订阅)两种消息处理方式。 **Point-to-Point(点对点)消息模式开发流程** 1. 生产者(producer)开发流程     1.1 创建 Connection Java代码  ![收藏代码](http://boy00fly.iteye.com/images/icon_star.png) 1. // 根据url,user和password创建一个jms Connection。   2. ActiveMQConnectionFactory connectionFactory   =   new ActiveMQConnectionFactory (user, password, url);   3. connection = connectionFactory.createConnection();   4. connection.start();       1.2 创建Session Java代码  ![收藏代码](http://boy00fly.iteye.com/images/icon_star.png) 1. /**在connection的基础上创建一个session,同时设置是否支持事务ACKNOWLEDGE标识。  2. • AUTO_ACKNOWLEDGE:自动确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收。   3. • CLIENT_ACKNOWLEDGE:客户端确认模式。会话对象依赖于应用程序对被接收的消息调用一个acknowledge()方法。一旦这个方法被调用,会话会确认最后一次确认之后所有接收到的消息。这种模式允许应用程序以一个调用来接收,处理并确认一批消息。注意:在管理控制台中,如果连接工厂的Acknowledge Policy(确认方针)属性被设置为"Previous"(提前),但是你希望为一个给定的会话确认所有接收到的消息,那么就用最后一条消息来调用acknowledge()方法。   4. • DUPS_OK_ACKNOWLEDGE:允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。在需要考虑资源使用时,这种模式非常有效。注意:如果你的应用程序无法处理重复的消息的话,你应该避免使用这种模式。如果发送消息的初始化尝试失败,那么重复的消息可以被重新发送。  5. •  SESSION_TRANSACTED**/   6. Session session = connection.createSession(   7. transacted, Session.AUTO_ACKNOWLEDGE);       1.3 创建Destination对象 Java代码  ![收藏代码](http://boy00fly.iteye.com/images/icon_star.png) 1. //需指定其对应的主题(subject)名称,producer和consumer将根据subject来发送/接收对应的消息   2. if (topic) {   3.     destination = session.createTopic(subject);   4. } else {   5.     destination = session.createQueue(subject);   6. }       1.4 创建MessageProducer Java代码  ![收藏代码](http://boy00fly.iteye.com/images/icon_star.png) 1. //根据Destination创建MessageProducer对象,同时设置其持久模式。    2. MessageProducer producer = session.createProducer(destination);   3. if (persistent) {   4.       producer.setDeliveryMode(DeliveryMode.PERSISTENT);   5. } else {   6.       producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);   7. }   8. if (timeToLive != 0) {   9.        producer.setTimeToLive(timeToLive);   10. }        1.5 发送消息到队列(Queue) Java代码  ![收藏代码](http://boy00fly.iteye.com/images/icon_star.png) 1. //封装TextMessage消息,使用MessageProducer的send方法将消息发送出去。   2. TextMessage message = session.createTextMessage("createMessageText");   3. producer.send(message);   2. 消费者(consumer)开发流程     2.1 实现MessageListener接口 Java代码  ![收藏代码](http://boy00fly.iteye.com/images/icon_star.png) 1. //消费者类必须实现MessageListener接口,然后在onMessage方法中监听消息到达处理。       2.2 创建Connection Java代码  ![收藏代码](http://boy00fly.iteye.com/images/icon_star.png) 1. //根据url,user和password创建一个jms Connection,如果是durable模式,还需要给connection设置一个clientId。   2. ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);   3. Connection connection = connectionFactory.createConnection();   4. if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) {   5.       connection.setClientID(clientId);   6. }   7. connection.setExceptionListener(this);   8. connection.start();       2.3 创建Session和Destination Java代码  ![收藏代码](http://boy00fly.iteye.com/images/icon_star.png) 1. //与产品类似       2.4 创建replayProducer【可选】 Java代码  ![收藏代码](http://boy00fly.iteye.com/images/icon_star.png) 1. //可以用来将消息处理结果发送给producer。    2. replyProducer = session.createProducer(null);   3. replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);"white-space: normal;">        2.5 创建MessageConsumer Java代码  ![收藏代码](http://boy00fly.iteye.com/images/icon_star.png) 1. //根据Destination创建MessageConsumer对象。   2. MessageConsumer consumer = null;   3. if (durable && topic) {   4.      consumer = session.createDurableSubscriber((Topic)destination, consumerName);   5. } else {   6.      consumer = session.createConsumer(destination);   7. }"white-space: normal;">        2.6 消费message Java代码  ![收藏代码](http://boy00fly.iteye.com/images/icon_star.png) 1. //在onMessage()方法中接收producer发送过来的消息进行处理,并可以通过replyProducer反馈信息给producer    2. if (message.getJMSReplyTo() != null) {    3.     replyProducer.send(message.getJMSReplyTo(),    4.     session.createTextMessage("Reply: " + message.getJMSMessageID()));    5. }     **Publish/Subscriber(发布/订阅者)消息开发模式** 1\. 订阅者(Subscriber)开发流程     1.1 实现MessageListener接口   在onMessage()方法中监听发布者发出的消息队列,并做相应处理。     1.2 创建Connection           根据url, user和password创建一个jms connection     1.3 创建Session   在connection的基础上创建一个session,同时设置是否支持和ACKNOWLEDGE标志。     1.4 创建 Topic           创建两个Topic,topictest.message用于接收发布者发出的消息,topictest.control用于向发布者发送消息,实现双方的交互。     1.5 创建consumer和producer对象   根据topictest.message创建consumer,根据topictest.control创建producer     1.6 接收处理消息   在onMessage()方法中,对收到的消息进行处理,可直接简单在本地显示消息,或者根据消息内容不同处理对应的业务逻辑(比如:数据库更新、文件操作等等),并且可以使用   producer对象处理结果返回给发布者。 2\. 发布者(Publisher)开发流程      2.1 实现MessageListener接口     在onMessage()方法中接收订阅者的反馈消息。      2.2 创建Connection    根据url, user和password 创建一个 jms Connection。      2.3 创建session    在connection的基础上创建一个session,同时设置是否支持事务和ACKNOWLEDGE标志。      2.4 创建Topic    创建两个Topic,topictest.messages用于向订阅者发布消息,topictest.control用于接收订阅者反馈的消息。这两个Topic与订阅者开发流程中的topic是一一对应的。      2.5 创建consumer和producer对象    根据topictest.message创建publisher;    根据topictest.control穿件consumer,同时监听订阅者反馈的消息。 **3\. JMS For Spring**       Spring提供了用于简化JMS API使用的抽象框架,并且对用户屏蔽了JMS API中1.0.2和1.1版本的差异。       JMS的功能大致上分为两块,叫做消息制造和消息消耗。JmsTemplate用于制造消息和同步消息接收。和J2EE的事件驱动Bean风格类似,对于异步接收消息,Spring提供了一些消息监听容器来创建消息驱动的POJO(MDP)。 1\. Spring 配置 connectionFactory Xml代码  ![收藏代码](http://boy00fly.iteye.com/images/icon_star.png) 1. 2.     此处需加以注意的是Listener端的borkerURL使用了failover传输方式:    3.     failover:(tcp://localhost:61616)?initialReconnectDelay=100&   4. maxReconnectAttempts=5    5.     failover transport是一种重新连接机制,用于建立可靠的传输。此处配置的是一旦ActiveMQ broker中断,Listener端将每隔100ms自动尝试连接,直至成功连接或重试5次连接失败为止。   6.     failover还支持多个borker同时提供服务,实现负载均衡的同时可增加系统容错性,格式: failover:(uri1,...,uriN)?transportOptions   7. -->   8. bean id="jmsFactory" destroy-method="stop" class="org.apache.activemq.pool.PooledConnectionFactory">   9.         property name="connectionFactory">   10.             bean class="org.apache.activemq.ActiveMQConnectionFactory">   11.                 property name="brokerURL" value="tcp://localhost:61616" />   12.                  property name="userName" value="${activemq.username}" />   13.                 property name="password" value="${activemq.password}" />   14.             bean>   15.         property>   16. bean>      2\. Spring JmsTemplate Xml代码  ![收藏代码](http://boy00fly.iteye.com/images/icon_star.png) 1. 2.         JmsTemplate 类的实例 一经配置便是线程安全 的。 要清楚一点,JmsTemplate    3.         是有状态的,因为它维护了 ConnectionFactory 的引用,但这个状态时不是会话状态。   4. -->   5.     bean id="myJmsTemplate"   6.         class="org.springframework.jms.core.JmsTemplate">   7.         property name="connectionFactory" ref="jmsFactory" />   8.         property name="defaultDestinationName" value="MySubject" />   9.          10. 优先级和存活时间作为服务质量(QOS)参数,   11.             默认{deliveryMode:2(1),priority:4,timeToLive:0}   12.             另一种使用无需QOS参数的缺省值方法。   13.             property name="explicitQosEnabled" value="true"/>   14.             property name="deliveryMode" value="2"/>   15.             property name="priority" value="4"/>   16.                         property name="timeToLive" value="1000"/>   17.          -->   18.           19.               20.          -->   21.     bean>      3\. 发送的接收消息 Xml代码  ![收藏代码](http://boy00fly.iteye.com/images/icon_star.png) 1. bean id="destinationTopic"   2.         class="org.apache.activemq.command.ActiveMQTopic">   3.         constructor-arg index="0" value="HelloWorldTopic" />   4.     bean>   5.        6.     bean id="consumer" class="com.d1xn.jms.demo.Consumer">   7.         property name="jmsTemplate" ref="myJmsTemplate" />   8.     bean>   9.    10.        11.     bean id="producerTopic" class="com.d1xn.jms.demo.ProducerTopic">   12.         property name="jmsTemplet" ref="myJmsTemplate" />   13.         property name="destination" ref="destinationTopic" />   14.     bean>   15.        16.    17.     bean id="listenerContainerTopic"   18. class="org.springframework.jms.listener.DefaultMessageListenerContainer">   19.         property name="connectionFactory" ref="jmsFactory" />   20.         property name="destination" ref="destinationTopic" />   21.         property name="messageListener" ref="consumer" />   22.          >   23.         property name="clientId" value="clientId_001" />   24.         property name="subscriptionDurable" value="true" />   25.         property name="durableSubscriptionName" value="My_001" />   26. bean>   说明(基于ActiveMQ5.4.2版本):         1、Web Console 的安全配置可参考           将conf/jetty.xml下面一段xml配置: Xml代码  ![收藏代码](http://boy00fly.iteye.com/images/icon_star.png) 1. bean id="securityConstraint" class="org.eclipse.jetty.http.security.Constraint">   2.     property name="name" value="BASIC" />                                          3.     property name="roles" value="admin" />                                         4.     property name="authenticate" value="true" />                                   5. bean>                                                                                authenticate值设置为true即可!那用户名/密码的配置是在conf/jetty-realm.properties!    详细可参考[http://activemq.apache.org/web-console.html](http://activemq.apache.org/web-console.html) 2、连接安全配置可参考      将conf/activemq-security.xml中如下的配置 Xml代码  ![收藏代码](http://boy00fly.iteye.com/images/icon_star.png) 1. plugins>                                                                                           2.                                  3.  simpleAuthenticationPlugin anonymousAccessAllowed="false">                                      4.     users>                                                                                       5.         authenticationUser username="system" password="${activemq.password}"                     6.             groups="admins"/>                                                                     7.         authenticationUser username="user" password="${guest.password}"                      8.             groups="users"/>                                                                      9.         authenticationUser username="guest" password="${guest.password}" groups="guests"/>-->    10.     users>                                                                                      11.  simpleAuthenticationPlugin>                                                                    12. plugins>                                                                                                    copy至conf/activemq.xml中 Xml代码  ![收藏代码](http://boy00fly.iteye.com/images/icon_star.png) 1. persistenceAdapter>                                     2.     kahaDB directory="${activemq.base}/data/kahadb"/>   3. persistenceAdapter>                                          的下面(这是简单的用户名、密码的认证方式)!    用户名、密码的可在conf/credentials.properties配置!    详细可参考:      [http://activemq.apache.org/security.html](http://activemq.apache.org/security.html)      [http://activemq.apache.org/xml-reference.html](http://activemq.apache.org/xml-reference.html)