# RocketMQ
## 说明
Zebra 集成了 [RocketMQ](https://rocketmq.apache.org/) 用于消息发送和接收。
## 样例
请参考 [https://gitee.com/gszebra/zebra/tree/master/zebra-sample/rocketmq](https://gitee.com/gszebra/zebra/tree/master/zebra-sample/rocketmq)
## 依赖引入
```markup
<dependency>
<groupId>com.guosen</groupId>
<artifactId>zebra-rocketmq</artifactId>
<version>${zebra.version}</version>
</dependency>
```
## 生产者
### 配置
#### 配置样例
```text
zebra.rocketmq.namesrvAddr=nameserverIp:nameserverPort
zebra.rocketmq.producerGroupName=com-guosen-zebra-sample-rocketmq-producer
zebra.rocketmq.producerInstanceName=zebraRocketMQProducer
```
#### 配置说明
| 配置项 | 类型 | 说明 |
| :--- | :--- | :--- |
| zebra.rocketmq.namesrvAddr | String | namesrvAddr 地址 |
| zebra.rocketmq.producerGroupName | String | 生产者 group 名称,填微服务全称 RocketMQ group 名称只支持 %\|a-zA-Z0-9\_-,**请将 . 手工替换为 -** |
| zebra.rocketmq.producerInstanceName | String | 生产者实例名称 |
| zebra.rocketmq.transactionProducerGroupName | String | **在使用事务生产者时配置** 事务生产者 group 名称,填微服务全称 RocketMQ group 名称只支持 %\|a-zA-Z0-9\_-,**请将 . 手工替换为 -** |
| zebra.rocketmq.producerTranInstanceName | String | **在使用事务生产者时配置** 事务生产者实例名称 |
### 代码
使用Spring,将 DefaultMQProducer 依赖注入,然后使用 send 方法发送 Message 对象。
```java
private static final String TOPIC_NAME = "com-guosen-zebra-sample-rocketmq-producer";
private static final String TAG_NAME = "tagA";
@Autowired
private DefaultMQProducer producer;
public String produce(String key, String value) {
byte[] body = value.getBytes(StandardCharsets.UTF_8);
Message message = new Message(TOPIC_NAME, TAG_NAME, key, body);
String returnInfo = null;
try {
SendResult sendResult = producer.send(message);
String messageId = sendResult.getMsgId();
returnInfo = "Message id is : " + messageId;
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
LOGGER.error("Failed to send message", e);
returnInfo = e.getMessage();
}
return returnInfo;
}
```
## 消费者
### 配置
#### 配置样例
```text
zebra.rocketmq.namesrvAddr=nameserverIp:nameserverPort
zebra.rocketmq.consumerGroupName=com-guosen-zebra-sample-rocketmq-consumer
zebra.rocketmq.consumerInstanceName=zebraRocketMQConsumer
zebra.rocketmq.consumerBatchMaxSize=1
zebra.rocketmq.consumerBroadcasting=false
zebra.rocketmq.subscribe[0]=com-guosen-zebra-sample-rocketmq-producer:tagA
zebra.rocketmq.enableHisConsumer=false
zebra.rocketmq.enableOrderConsumer=false
```
#### 配置说明
| 配置项 | 类型 | 说明 |
| :--- | :--- | :--- |
| zebra.rocketmq.namesrvAddr | String | namesrvAddr 地址 |
| zebra.rocketmq.consumerGroupName | String | 消费者 group 名称,填微服务全称 RocketMQ group 名称只支持 %\|a-zA-Z0-9\_-,请将 . 手工替换为 - |
| zebra.rocketmq.consumerInstanceName | String | 消费者实例名称 |
| zebra.rocketmq.consumerBatchMaxSize | Integer | 一次最大消费多少数量消息 |
| zebra.rocketmq.consumerBroadcasting | Boolean | 广播消费 true : 广播 false : 集群 |
| zebra.rocketmq.subscribe\[0\] | String | 消费的topic和tag配置,格式为topic:tag topic 必须为所消费的 producerGroupName,否则无法展示 MQ 依赖关系 |
| zebra.rocketmq.enableHisConsumer | Boolean | 启动的时候是否消费历史记录 true :启动 false :不启动 |
| zebra.rocketmq.enableOrderConsumer | Boolean | 启动顺序消费 true :启动 false :不启动 |
### 代码
Zebra 把 RocketMQ 获取到的消息封装为 Spring 的 ApplicationEvent,开发者实现 ApplicationListener 接口即可消费消息。
```java
@Component
public class RocketMQListener implements ApplicationListener<RocketmqEvent> {
private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQListener.class);
@Override
public void onApplicationEvent(RocketmqEvent event) {
List<MessageExt> msgs = event.getMsgs();
for (MessageExt messageExt: msgs) {
handle(messageExt);
}
}
private void handle(MessageExt messageExt) {
String key = messageExt.getKeys();
byte[] body = messageExt.getBody();
String value = new String(body, StandardCharsets.UTF_8);
LOGGER.info("Received message, key : {}, value : {}", key, value);
}
}
```
- 简介
- 入门
- Docker方式
- 手工方式
- 环境搭建
- 简述
- 初始化服务器
- 安装 Etcd
- 初始化数据库
- 安装配置中心
- 安装服务中心
- 安装监控中心
- 安装API网关
- 开发第一个微服务
- 简述
- 准备工作
- 接口定义
- 实现
- 配置
- 部署
- 验证
- 微服务开发
- 服务调用
- 服务上下文
- 日志
- 高级特性
- 范化调用
- 异步调用
- ACL 控制
- 慢启动
- 消息大小
- 组件使用
- 简述
- 缓存
- MyBatis
- RocketMQ
- 分库分表
- 读写分离
- 分布式事务
- 分布式锁
- 运维管理
- 服务依赖
- 服务查询
- 服务方法查询
- 配置中心
- API网关配置
- 流量控制
- 简述
- 限流
- 熔断
- 系统保护
- 白名单
- 黑名单
- 监控
- 主动探测
- 调用链
- 日志开关
- 最佳实践
- 常见问题
- 深入 Zebra
- 架构
- 通信协议
- 路标