企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] # 简介 首先加入RabbitMQ java client依赖: ~~~xml <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.2</version> </dependency> </dependencies> ~~~ RabbitMQ的java client使用`com.rabbitmq.client`作为其顶级包。关键的类和接口是: ~~~ com.rabbitmq.client.Channel com.rabbitmq.client.Connection com.rabbitmq.client.ConnectionFactory com.rabbitmq.client.Consumer ~~~ 通过Channel可以进行一系列的api操作。 Connection(连接)用于打开通道,注册连接生命周期事件处理程序,并关闭不再需要的连接。 Connection(连接)通过ConnectionFactory实例化,ConnectionFactory可以设置一些Collection(连接)的一些配置,比如说vhost或者说username等等。 ## Connections(连接)和Channels(管道) 核心的类是Connections(连接)和Channels(管道),分别代表着AMQP 0-9-1协议中的Connections(连接)和Channels(管道),一般被导入 ~~~ import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; ~~~ ## 连接服务器 下面的代码时使用给定的参数(host name,端口等等)连接AMQP的服务器。 ~~~ ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); Connection conn = factory.newConnection(); ~~~ 所有的这些参数RabbitMQ服务器都设置了默认值,可以在ConnectionFactory类中查看这些默认值。 另外,URI可以以下面的方法进行连接都有默认值。 ~~~ ConnectionFactory factory = new ConnectionFactory(); factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost"); Connection conn = factory.newConnection(); ~~~ Connection(连接)接口可以被用作创建一个channel(管道): ~~~ Channel channel = conn.createChannel(); ~~~ 可以使用channel(管道)发送和接收消息,下面会有讲到。 关闭连接,只需要关闭channel(管道)和connection(连接): ~~~ channel.close(); conn.close(); ~~~ 注意,关闭管道是被认为是最佳实践,但是却不是严格意义的必要的。当底层的连接关闭时候,channel(管道)也就自动的被关闭了。 ## 使用Exchanges和Queues 客户端应用必须应用在exchanges和queues,这些都是AMQP协议定义的。使用这些(exchanges和queues)首先必须“声明”它(就是创建的意思)。 下面的代码就是怎样去"声明"一个exchange和队列,并且将它们绑定在一起。 ~~~ channel.exchangeDeclare(exchangeName, "direct", true); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, exchangeName, routingKey); ~~~ 可以通过参数去设置exchange和queue的一些属性,使用这些方法的一些重载方法进行相关设置。 ~~~ channel.exchangeDeclare(exchangeName, "direct", true); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); ~~~ ## 发送消息(Publishing messages) 使用Channel.basicPublish方法将消息发送给一个exchange: ~~~ byte[] messageBodyBytes = "Hello, world!".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes); ~~~ 为了更好的控制,你可以使用重载的参数来设置消息的一些属性(比如说mandatory标志,关于mandatory标志,下面会讲到),或者在发送消息前设定一些消息属性。 ~~~ channel.basicPublish(exchangeName, routingKey, mandatory, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes); ~~~ 可以自己构建BasicProperties的对象,如下面的代码: ~~~ channel.basicPublish(exchangeName, routingKey, new AMQP.BasicProperties.Builder() .contentType("text/plain") .deliveryMode(2) .priority(1) .userId("bob") .build()), messageBodyBytes); ~~~ 发送消息指定头信息: ~~~ Map<String, Object> headers = new HashMap<String, Object>(); headers.put("latitude", 51.5252949); headers.put("longitude", -0.0905493); channel.basicPublish(exchangeName, routingKey, new AMQP.BasicProperties.Builder() .headers(headers) .build()), messageBodyBytes); ~~~ 发送一个有过期时间的消息,下面的博客也会讲到: ~~~ channel.basicPublish(exchangeName, routingKey, new AMQP.BasicProperties.Builder() .expiration("60000") .build()), messageBodyBytes); ~~~ ## 订阅消息("Push API") ~~~ import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; ~~~ 最有效的接收消息的方法是使用Consumer接口去订阅。当消息到达消费端的时候会自动的传递消费(delivered),而不需要去请求。 当我们调用Consumers(消费者)有关的api的时候,会生成一个消费者标识符(consumer tag)。 不同的Consumer实例必须有不同的消费者标签。 强烈建议不要在连接上重复使用消费者标签,不然在监视消费者时可能导致自动连接恢复和混淆监控数据的问题。 实现Consumer的最简单的方法是将便利(convenience)类DefaultConsumer子类化。 该子类的对象可以在basicConsume方法调用中传递以设置订阅: ~~~java boolean autoAck = false; channel.basicConsume(queueName, autoAck, "myConsumerTag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); long deliveryTag = envelope.getDeliveryTag(); // (process the message components here ...) channel.basicAck(deliveryTag, false); } }); ~~~ 在这里,因为我们设置了自动确认(`autoAck`)的值为false,所以有必要在传递给消费者的方法中进行自动确认(`handleDelivery`方法中)。 更复杂的消费者将会重写更多的方法。事实上,`handleShutdownSignal`方法被调用当Channel(通道)和连接关闭的时候。并且在调用该消费者的任何回调方法之前将`consumer tag`传递给`handleConsumeOk`(com.rabbitmq.client.Consumer接口中定义的方法)方法 消费者还可以分别实现`handleCancelOk`(com.rabbitmq.client.Consumer接口中定义的方法)和`handleCancel`(com.rabbitmq.client.Consumer接口中定义的方法)方法来通知显式和隐式取消。 你也可以使用Channel.basicCancel方法明确的取消一个特定的消费,传递consumer tag, ~~~ channel.basicCancel(consumerTag); ~~~ 和生产者一样,对于消费者来说并发处理消息也要慎重考虑。 回调给消费者是在与实例化其`Channel`(管道)的线程分开的线程池中调度的。 这意味着消费者可以安全地在`Connection`或`Channel`上调用阻塞方法,例如`Channel#queueDeclare`或`Channel#basicCancel`。 每一个`Channel`(管道)都有自己的调度线程。对于最常用的使用方式就是一个消费者一个`Channel`(管道),意味着一个消费者不会阻塞其他的消费。如果是一个`Channel`(管道)多消费者必须明白一个长时间的消费调用可能会阻塞其他消费者的回调调度。 # 通道和并发注意事项(线程安全) 根据经验,**在线程间共享Channel(通道)是要避免的**。应用应该优先使用每个线程自己的Channel(通道)实例,而不是多个线程共享这个Channel(通道)实例。 虽然有些在Channel(通道)上的操作是可以并发安全的调用,但是一些操作不行会导致一些边界交错,双重确认等等。 在共享(多线程)Channel(通道)上进行并发发布会导致一些边界交错,触发连接协议异常和连接关闭。因此需要严格在应用中同步调用(Channel#basicPublish必须在正确关键的地方调用)。线程之间的共享也会干扰生产者的消息确认。我们强烈的推荐不应该在通道上进行并发的发布消息。 在共享的Channel(通道)上一个线程生产(publish)消息,一个线程消费(consume)消息是线程安全的。 服务器推送可以同时发送,保证每通道的订阅被保留。 调度机制使用`java.util.concurrent.ExecutorService`。 可以使用单列的`ConnectionFactory`调用`ConnectionFactory#setSharedExecutor`去设置所有连接共用的`executor`。 当我们手动确认[manual acknowledgements](https://link.jianshu.com?t=http://www.rabbitmq.com/confirms.html) 的时候,很重要的是考虑什么线程去做这个ack确认。如果接收传递的线程(例如,Consumer#handleDelivery委托给不同线程的传递处理)不同于手动确认的线程,则将多个线程参数设置为true是线程不安全的并导致双重确认,因此导致通道协议异常导致Channel关闭。一次确认一条消息可以确保安全的。 # 简单例子 通过ConnectionFactory获得Connection,Connection得到Channel **Exchange** ~~~ package rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; public class ExchangeTest { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //创建exchange,类型是direct类型 channel.exchangeDeclare("zhihao.miao", "direct"); //创建exchange, 类型是direct类型 channel.exchangeDeclare("zhihao.miao.info", BuiltinExchangeType.DIRECT); //第三个参数表示是否持久化,同步操作,有返回值 AMQP.Exchange.DeclareOk ok = channel.exchangeDeclare("zhihao.miao.debug", BuiltinExchangeType.DIRECT, true); System.out.println("---ok---" + ok); //设置属性 Map<String, Object> argument = new HashMap<>(); argument.put("alternate-exchange", "log"); //第三个是持久化,第四个是是否自动删除 channel.exchangeDeclare("zhihao.miao.warn", BuiltinExchangeType.TOPIC, true, false, argument); //异步创建exchange,没有返回值, channel.exchangeDeclareNoWait("zhihao.miao.log", BuiltinExchangeType.TOPIC, true, false, false, argument); //判断是否存在, 不存在就报错 AMQP.Exchange.DeclareOk declareOk = channel.exchangeDeclarePassive("zhihao.miao.info"); System.out.println("---declareOk---" + declareOk); //判断是否存在, 不存在就报错 declareOk = channel.exchangeDeclarePassive("zhihao.miao.debug"); System.out.println("---declareOk2---" + declareOk); //删除exchange(可重复执行), 删除一个不存在的也不会报错 channel.exchangeDelete("zhihao.miao"); channel.exchangeDelete("zhihao.miao.debug"); channel.exchangeDelete("zhihao.miao.info"); channel.exchangeDelete("zhihao.miao.warn"); channel.exchangeDelete("zhihao.miao.log"); //删除exchange channel.exchangeDelete("zhihao.miao.info"); channel.close(); connection.close(); } } ~~~ 队列的api操作 **queues** ~~~ public class QueueTest { public static void main(String[] args) throws Exception{ ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.131"); connectionFactory.setPort(5672); connectionFactory.setUsername("zhihao.miao"); connectionFactory.setPassword("123456"); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //第二个参数表示是否持久化,第三个参数是判断这个队列是否在连接是否生效,为true表示连接关闭队列删除。 AMQP.Queue.DeclareOk ok = channel.queueDeclare("zhihao.info",true,false,false,null); System.out.println(ok); //异步没有返回值的方法api channel.queueDeclareNoWait("zhihao.info.miao",true,false,false,null); //判断queue是否存在,不存在会抛出异常 //channel.exchangeDeclarePassive("zhihao.info"); //抛出错误 //channel.exchangeDeclarePassive("zhihao.info.miao2"); //exchange和queue进行绑定(可重复执行,不会重复创建) channel.queueBind("zhihao.info","zhihao.miao.order","info"); //异步进行绑定 channel.queueBindNoWait("zhihao.info.miao","zhihao.miao.pay","info",null); //exchange与exchange进行绑定(可重复执行,不会重复创建) channel.exchangeBind("zhihao.miao.email","zhihao.miao.weixin","debug"); //exchange和queue进行解绑(可重复执行) channel.queueUnbind("zhihao.info","zhihao.miao.order","info"); //exchange和exchange进行解绑(可重复执行) channel.exchangeUnbind("zhihao.info.miao","zhihao.miao.pay","debug"); //删除队列 channel.queueDelete("zhihao.info"); channel.close(); connection.close(); } } ~~~ **消息的发送** ~~~java public class Sender { public static void main(String[] args) throws Exception{ ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2). contentEncoding("UTF-8").build(); //第一个参数是exchange参数,如果是为空字符串,那么就会发送到(AMQP default)默认的exchange,而且routingKey //便是所要发送到的队列名 channel.basicPublish("","zhihao.info.miao",properties,"忘记密码,验证码是1234".getBytes()); channel.basicPublish("","zhihao.miao",properties,"忘记密码,六位验证密码是343sdf".getBytes()); //direct类型的exchange类型的exchange,zhihao.miao.order绑定zhihao.info.miao队列,route key是order channel.basicPublish("zhihao.miao.order","order",properties,"爱奇艺会员到期了".getBytes()); //zhihao.miao.pay绑定zhihao.info.miao队列,route key是order channel.basicPublish("zhihao.miao.pay","pay",properties,"优酷会员到期了".getBytes()); //topic类型的exchange channel.basicPublish("log","user.log",properties,"你的外卖已经送达".getBytes()); channel.basicPublish("log","user.log.info",properties,"你的外卖正在配送中".getBytes()); channel.basicPublish("log","user",properties,"你的投诉已经采纳".getBytes()); channel.close(); connection.close(); } } ~~~ **消息消费** ~~~ public class Consumer { public static void main(String[] args) throws Exception{ ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.131"); connectionFactory.setPort(5672); connectionFactory.setUsername("zhihao.miao"); connectionFactory.setPassword("123456"); connectionFactory.setVirtualHost("/"); //客户端的消费消息 Map<String,Object> clientProperties = new HashMap<>(); clientProperties.put("desc","支付系统2.0"); clientProperties.put("author","zhihao.miao"); clientProperties.put("user","zhihao.miao@xxx.com"); connectionFactory.setClientProperties(clientProperties); //给客户端的connetction命名 Connection connection = connectionFactory.newConnection("log队列的消费者"); //给channel起个编号 Channel channel = connection.createChannel(10); //返回consumerTag,也可以通过重载方法进行设置consumerTag String consumerTag = channel.basicConsume("user_log_queue",true,new SimpleConsumer(channel)); System.out.println(consumerTag); TimeUnit.SECONDS.sleep(30); channel.close(); connection.close(); } } ~~~ 具体的消息逻辑,继承DefaultConsumer类重写handleDelivery方法,如果是手工确认消息,会在handleDelivery方法中进行相关的确认(调用相关api) ~~~java public class SimpleConsumer extends DefaultConsumer{ public SimpleConsumer(Channel channel){ super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(consumerTag); System.out.println("-----收到消息了---------------"); System.out.println("消息属性为:"+properties); System.out.println("消息内容为:"+new String(body)); } } ~~~