🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
# RocketMQ ## 说明 Zebra 集成了 [RocketMQ](https://rocketmq.apache.org/) 用于消息发送和接收。 ## 样例 请参考 [https://gitee.com/gszebra/zebra/tree/master/zebra-sample/rocketmq](https://gitee.com/gszebra/zebra/tree/master/zebra-sample/rocketmq) ## 依赖引入 ```markup <dependency> <groupId>com.guosen</groupId> <artifactId>zebra-rocketmq</artifactId> <version>${zebra.version}</version> </dependency> ``` ## 生产者 ### 配置 #### 配置样例 ```text zebra.rocketmq.namesrvAddr=nameserverIp:nameserverPort zebra.rocketmq.producerGroupName=com-guosen-zebra-sample-rocketmq-producer zebra.rocketmq.producerInstanceName=zebraRocketMQProducer ``` #### 配置说明 | 配置项 | 类型 | 说明 | | :--- | :--- | :--- | | zebra.rocketmq.namesrvAddr | String | namesrvAddr 地址 | | zebra.rocketmq.producerGroupName | String | 生产者 group 名称,填微服务全称 RocketMQ group 名称只支持 %\|a-zA-Z0-9\_-,**请将 . 手工替换为 -** | | zebra.rocketmq.producerInstanceName | String | 生产者实例名称 | | zebra.rocketmq.transactionProducerGroupName | String | **在使用事务生产者时配置** 事务生产者 group 名称,填微服务全称 RocketMQ group 名称只支持 %\|a-zA-Z0-9\_-,**请将 . 手工替换为 -** | | zebra.rocketmq.producerTranInstanceName | String | **在使用事务生产者时配置** 事务生产者实例名称 | ### 代码 使用Spring,将 DefaultMQProducer 依赖注入,然后使用 send 方法发送 Message 对象。 ```java private static final String TOPIC_NAME = "com-guosen-zebra-sample-rocketmq-producer"; private static final String TAG_NAME = "tagA"; @Autowired private DefaultMQProducer producer; public String produce(String key, String value) { byte[] body = value.getBytes(StandardCharsets.UTF_8); Message message = new Message(TOPIC_NAME, TAG_NAME, key, body); String returnInfo = null; try { SendResult sendResult = producer.send(message); String messageId = sendResult.getMsgId(); returnInfo = "Message id is : " + messageId; } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { LOGGER.error("Failed to send message", e); returnInfo = e.getMessage(); } return returnInfo; } ``` ## 消费者 ### 配置 #### 配置样例 ```text zebra.rocketmq.namesrvAddr=nameserverIp:nameserverPort zebra.rocketmq.consumerGroupName=com-guosen-zebra-sample-rocketmq-consumer zebra.rocketmq.consumerInstanceName=zebraRocketMQConsumer zebra.rocketmq.consumerBatchMaxSize=1 zebra.rocketmq.consumerBroadcasting=false zebra.rocketmq.subscribe[0]=com-guosen-zebra-sample-rocketmq-producer:tagA zebra.rocketmq.enableHisConsumer=false zebra.rocketmq.enableOrderConsumer=false ``` #### 配置说明 | 配置项 | 类型 | 说明 | | :--- | :--- | :--- | | zebra.rocketmq.namesrvAddr | String | namesrvAddr 地址 | | zebra.rocketmq.consumerGroupName | String | 消费者 group 名称,填微服务全称 RocketMQ group 名称只支持 %\|a-zA-Z0-9\_-,请将 . 手工替换为 - | | zebra.rocketmq.consumerInstanceName | String | 消费者实例名称 | | zebra.rocketmq.consumerBatchMaxSize | Integer | 一次最大消费多少数量消息 | | zebra.rocketmq.consumerBroadcasting | Boolean | 广播消费 true : 广播 false : 集群 | | zebra.rocketmq.subscribe\[0\] | String | 消费的topic和tag配置,格式为topic:tag topic 必须为所消费的 producerGroupName,否则无法展示 MQ 依赖关系 | | zebra.rocketmq.enableHisConsumer | Boolean | 启动的时候是否消费历史记录 true :启动 false :不启动 | | zebra.rocketmq.enableOrderConsumer | Boolean | 启动顺序消费 true :启动 false :不启动 | ### 代码 Zebra 把 RocketMQ 获取到的消息封装为 Spring 的 ApplicationEvent,开发者实现 ApplicationListener 接口即可消费消息。 ```java @Component public class RocketMQListener implements ApplicationListener<RocketmqEvent> { private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQListener.class); @Override public void onApplicationEvent(RocketmqEvent event) { List<MessageExt> msgs = event.getMsgs(); for (MessageExt messageExt: msgs) { handle(messageExt); } } private void handle(MessageExt messageExt) { String key = messageExt.getKeys(); byte[] body = messageExt.getBody(); String value = new String(body, StandardCharsets.UTF_8); LOGGER.info("Received message, key : {}, value : {}", key, value); } } ```