🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
[TOC] # 依赖 ~~~ <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> ~~~ # 配置文件 ~~~ # rabbitmq的ip和端口 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 # 账号密码 spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 开启发送确认 #spring.rabbitmq.publisher-confirms=true # 开启发送失败退回 #spring.rabbitmq.publisher-returns=true #spring.rabbitmq.dynamic=true # 设置手动应答 #spring.rabbitmq.listener.simple.acknowledge-mode=manual # spring.rabbitmq.listener.direct.acknowledge-mode=manual #spring.rabbitmq.cache.connection.mode=channel # 虚拟主机 #spring.rabbitmq.virtual-host=/ ~~~ # 参数配置详解 ## 基础信息 ~~~ spring.rabbitmq.host: 默认localhost spring.rabbitmq.port: 默认5672 spring.rabbitmq.username: 用户名 spring.rabbitmq.password: 密码 spring.rabbitmq.virtual-host: 连接到代理时用的虚拟主机 spring.rabbitmq.addresses: 连接到server的地址列表(以逗号分隔),先addresses后host spring.rabbitmq.requested-heartbeat: 请求心跳超时时间,0为不指定,如果不指定时间单位默认为妙 spring.rabbitmq.publisher-confirms: 是否启用【发布确认】,默认false spring.rabbitmq.publisher-returns: 是否启用【发布返回】,默认false spring.rabbitmq.connection-timeout: 连接超时时间,单位毫秒,0表示永不超时 ~~~ ## SSL ~~~ spring.rabbitmq.ssl.enabled: 是否支持ssl,默认false spring.rabbitmq.ssl.key-store: 持有SSL certificate的key store的路径 spring.rabbitmq.ssl.key-store-password: 访问key store的密码 spring.rabbitmq.ssl.trust-store: 持有SSL certificates的Trust store spring.rabbitmq.ssl.trust-store-password: 访问trust store的密码 spring.rabbitmq.ssl.trust-store-type=JKS:Trust store 类型. spring.rabbitmq.ssl.algorithm: ssl使用的算法,默认由rabiitClient配置 spring.rabbitmq.ssl.validate-server-certificate=true:是否启用服务端证书验证 spring.rabbitmq.ssl.verify-hostname=true 是否启用主机验证 ~~~ ## 缓存cache ~~~ spring.rabbitmq.cache.channel.size: 缓存中保持的channel数量 spring.rabbitmq.cache.channel.checkout-timeout: 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel spring.rabbitmq.cache.connection.size: 缓存的channel数,只有是CONNECTION模式时生效 spring.rabbitmq.cache.connection.mode=channel: 连接工厂缓存模式:channel 和 connection ~~~ ## Listener ~~~ spring.rabbitmq.listener.type=simple: 容器类型.simple或direct spring.rabbitmq.listener.simple.auto-startup=true: 是否启动时自动启动容器 spring.rabbitmq.listener.simple.acknowledge-mode: 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto spring.rabbitmq.listener.simple.concurrency: 最小的消费者数量 spring.rabbitmq.listener.simple.max-concurrency: 最大的消费者数量 spring.rabbitmq.listener.simple.prefetch: 一个消费者最多可处理的nack消息数量,如果有事务的话,必须大于等于transaction数量. spring.rabbitmq.listener.simple.transaction-size: 当ack模式为auto时,一个事务(ack间)处理的消息数量,最好是小于等于prefetch的数量.若大于prefetch, 则prefetch将增加到这个值 spring.rabbitmq.listener.simple.default-requeue-rejected: 决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系) spring.rabbitmq.listener.simple.missing-queues-fatal=true 若容器声明的队列在代理上不可用,是否失败; 或者运行时一个多多个队列被删除,是否停止容器 spring.rabbitmq.listener.simple.idle-event-interval: 发布空闲容器的时间间隔,单位毫秒 spring.rabbitmq.listener.simple.retry.enabled=false: 监听重试是否可用 spring.rabbitmq.listener.simple.retry.max-attempts=3: 最大重试次数 spring.rabbitmq.listener.simple.retry.max-interval=10000ms: 最大重试时间间隔 spring.rabbitmq.listener.simple.retry.initial-interval=1000ms:第一次和第二次尝试传递消息的时间间隔 spring.rabbitmq.listener.simple.retry.multiplier=1: 应用于上一重试间隔的乘数 spring.rabbitmq.listener.simple.retry.stateless=true: 重试时有状态or无状态 spring.rabbitmq.listener.direct.acknowledge-mode= ack模式 spring.rabbitmq.listener.direct.auto-startup=true 是否在启动时自动启动容器 spring.rabbitmq.listener.direct.consumers-per-queue= 每个队列消费者数量. spring.rabbitmq.listener.direct.default-requeue-rejected= 默认是否将拒绝传送的消息重新入队. spring.rabbitmq.listener.direct.idle-event-interval= 空闲容器事件发布时间间隔. spring.rabbitmq.listener.direct.missing-queues-fatal=false若容器声明的队列在代理上不可用,是否失败. spring.rabbitmq.listener.direct.prefetch= 每个消费者可最大处理的nack消息数量. spring.rabbitmq.listener.direct.retry.enabled=false 是否启用发布重试机制. spring.rabbitmq.listener.direct.retry.initial-interval=1000ms # Duration between the first and second attempt to deliver a message. spring.rabbitmq.listener.direct.retry.max-attempts=3 # Maximum number of attempts to deliver a message. spring.rabbitmq.listener.direct.retry.max-interval=10000ms # Maximum duration between attempts. spring.rabbitmq.listener.direct.retry.multiplier=1 # Multiplier to apply to the previous retry interval. spring.rabbitmq.listener.direct.retry.stateless=true # Whether retries are stateless or stateful. ~~~ ## Template ~~~ spring.rabbitmq.template.mandatory: 启用强制信息;默认false spring.rabbitmq.template.receive-timeout: receive() 操作的超时时间 spring.rabbitmq.template.reply-timeout: sendAndReceive() 操作的超时时间 spring.rabbitmq.template.retry.enabled=false: 发送重试是否可用 spring.rabbitmq.template.retry.max-attempts=3: 最大重试次数 spring.rabbitmq.template.retry.initial-interva=1000msl: 第一次和第二次尝试发布或传递消息之间的间隔 spring.rabbitmq.template.retry.multiplier=1: 应用于上一重试间隔的乘数 spring.rabbitmq.template.retry.max-interval=10000: 最大重试时间间隔 ~~~ # 多个数据源 但是很多业务场景下可能监听不同的MQ服务,而每个MQ服务是不同的业务线自己搭建的服务,需要配置多个MQ源。 在springboot 中配置单个RabbitMQ是极其简单的,我们只需要使用Springboot为我们自动装配的RabbitMQ相关的配置就可以了。但是需要配置多个源时,第二个及其以上的就需要单独配置了,这里我使用的都是单独配置的。 **抽象配置** ~~~ @Data public abstract class AbstractRabbitConfiguration { protected String host; protected int port; protected String username; protected String password; protected ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); return connectionFactory; } } ~~~ **第一个源的配置代码** ~~~ import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; @Configuration @ConfigurationProperties("spring.rabbitmq.first") public class FirstRabbitConfiguration extends AbstractRabbitConfiguration { @Bean(name = "firstConnectionFactory") @Primary public ConnectionFactory firstConnectionFactory() { return super.connectionFactory(); } @Bean(name = "firstRabbitTemplate") @Primary public RabbitTemplate firstRabbitTemplate(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } @Bean(name = "firstFactory") public SimpleRabbitListenerContainerFactory firstFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); return factory; } @Bean(value = "firstRabbitAdmin") public RabbitAdmin firstRabbitAdmin(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } } ~~~ **第二个源的配置代码** ~~~ import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @ConfigurationProperties("spring.rabbitmq.second") public class SecondRabbitConfiguration extends AbstractRabbitConfiguration { @Bean(name = "secondConnectionFactory") public ConnectionFactory secondConnectionFactory() { return super.connectionFactory(); } @Bean(name = "secondRabbitTemplate") public RabbitTemplate secondRabbitTemplate(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } @Bean(name = "secondFactory") public SimpleRabbitListenerContainerFactory secondFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); return factory; } @Bean(value = "secondRabbitAdmin") public RabbitAdmin secondRabbitAdmin(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } } ~~~ **配置信息** ~~~ spring: application: name: multi-rabbitmq rabbitmq: first: host: 192.168.10.76 port: 30509 username: admin password: 123456 second: host: 192.168.10.76 port: 31938 username: admin password: 123456 ~~~ **测试** 这样我们的两个RabbitMQ源就配置好了,接下来我们进行测试使用,为了方便使用,我写了一个MultiRabbitTemplate.class 方便我们使用不同的源。 ~~~ @Component public abstract class MultiRabbitTemplate { @Autowired @Qualifier(value = "firstRabbitTemplate") public AmqpTemplate firstRabbitTemplate; @Autowired @Qualifier(value = "secondRabbitTemplate") public AmqpTemplate secondRabbitTemplate; } ~~~ 第一个消息发送者类 TestFirstSender.class ~~~ @Component @Slf4j public class TestFirstSender extends MultiRabbitTemplate implements MessageSender { @Override public void send(Object msg) { log.info("rabbitmq1 , msg: {}", msg); firstRabbitTemplate.convertAndSend("rabbitmq1", msg); } public void rabbitmq1sender() { this.send("innerpeacez1"); } } ~~~ 第二个消息发送者类 TestSecondSender.class ~~~ @Component @Slf4j public class TestSecondSender extends MultiRabbitTemplate implements MessageSender { @Override public void send(Object msg) { log.info("rabbitmq2 , msg: {}", msg); secondRabbitTemplate.convertAndSend("rabbitmq2", msg); } public void rabbitmq2sender() { this.send("innerpeacez2"); } } ~~~ **动态创建Queue的消费者** ~~~ @Slf4j @Component public class TestFirstConsumer implements MessageConsumer { @Override @RabbitListener(bindings = @QueueBinding(value = @Queue("rabbitmq1") , exchange = @Exchange("rabbitmq1") , key = "rabbitmq1") , containerFactory = "firstFactory") public void receive(Object obj) { log.info("rabbitmq1 , {}", obj); } } ~~~ ~~~ @Slf4j @Component public class TestSecondConsumer implements MessageConsumer { @Override @RabbitListener(bindings = @QueueBinding(value = @Queue("rabbitmq2") , exchange = @Exchange("rabbitmq2") , key = "rabbitmq2") , containerFactory = "secondFactory") public void receive(Object obj) { log.info("rabbitmq2 , {}", obj); } } ~~~ **测试类** ~~~ @RunWith(SpringRunner.class) @SpringBootTest @Slf4j public class SpringBootMultiRabbitmqApplicationTests extends MultiRabbitTemplate { @Autowired private TestFirstSender firstSender; @Autowired private TestSecondSender secondSender; /** * 一百个线程向 First Rabbitmq 的 rabbitmq1 queue中发送一百条消息 */ @Test public void testFirstSender() { for (int i = 0; i < 100; i++) { new Thread(() -> firstSender.rabbitmq1sender() ).start(); } try { Thread.sleep(1000 * 10); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 一百个线程向 Second Rabbitmq 的 rabbitmq2 queue中发送一百条消息 */ @Test public void testSecondSender() { for (int i = 0; i < 100; i++) { new Thread(() -> secondSender.rabbitmq2sender() ).start(); } try { Thread.sleep(1000 * 10); } catch (InterruptedException e) { e.printStackTrace(); } } } ~~~ **也可以这样使用** 生产者 ~~~ //注入template @Autowired private AmqpTemplate firstRabbitTemplate; //发送消息 public void Sender(){ firstRabbitTemplate.convertAndSend("exchange","tdemo.user.update","user updated "); } ~~~ 消费者 ~~~ // 监听器监听指定的queue,这里也可以指定多个队列 @RabbitListener(queues = {"messages"}, containerFactory="firstListenerContainerFactory") public void processMessage(String message) { System.out.println("received messages is : "+message); } ~~~