💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
# 保证消息的可靠性传输 > 在消息传输的过程中,有三个环节可能会造成消息遗失,分别是**生产者写入队列**,**队列处理消息**,**消费者读取队列** ## RabbitMQ ### 生产者弄丢了消息 * **事务** 数据发送失败返回异常,生产者根据异常回滚事务并重试;数据发送成功时提交事务。 * **confirm机制** 生产者写消息RabbitMQ都会分配一个唯一id,如果RabbitMQ成功接收消息会返回ack消息,如果RabbitMQ没能处理这个消息,会回调生产者的nack接口,生产者可以在nack写重试机制。 #### 小结 事务采用了同步发送消息的机制,而confirm采用了异步机制,拥有更大的吞吐量 ### 队列弄丢了消息 > 由于宕机等原因导致的队列缓存数据丢失 通过开启持久化,将缓存数据写入到磁盘 分两步开启持久化 1. 设置Queue持久化,RabbitMQ只会持久化queue的元数据 2. 向queue写入消息的时候将deliveryMode设置为2,将消息持久化。 消息被持久化到磁盘之前丢失 结合写入消息的confirm机制,消息写入queue并持久化磁盘成功,才通知生产者ack。如果RabbitMQ持久化过程中碰到了问题,回调生产者的nack接口。 #### 小结 通过持久化消息,再结合消息写入的confirm机制,保证了队列消息的可靠性 ### 消费端弄丢了数据 > 消费者从RabbitMQ取到消息之后,还没来得及进行业务处理,消费者的进程意外挂掉了,RabbitMQ会认为消息被消费了,但实际业务还没有处理。 通过RabbitMQ提供的ack机制可以解决这个问题。关闭RabbitMQ的自动ack,消费者处理完成逻辑之后主动提交ack。 ## Kafka 基本架构 由多个broker组成,每个topic被分隔成多个partition,每个partition被复制多份保存在多个broker中。每个系列的partition被抽象为一个replica,每个replica中包含一个leader,剩下的都是follower ### 生产者写入消息失败 生产者提交了消息,但是写入队列失败。 Producer配置acks=all之后,会在写入队列成功之后向Producer发送ack消息。保证了[消息提交]&[消息写入队列]的操作原子性 ### Kafka弄丢了消息 当leader所在的broker节点宕机之后,需要从follower中选举出新的leader ,如果leader到follower的同步还没完成,新的leader中的消息可能是不完整的。 通过以下四步保证replica成员的数据一致性 1. 设置topic,replication.factor > 1,表示每个partition设置的副本个数 2. 设置服务端Kafka,min.insync.replicas > 1,表示每个leader需要感知的follower个数 3. Producer设置acks=all,保证消息写入所有replica之后,才返回ack 4. Producer设置retries=MAX,设置消息写入失败之后重试的次数 ### 消费者弄丢了消息 初始的消息消费流程,消费者得到消息之后立即提交offset向服务器确认消息已消费,再执行业务代码。可能发生消息已消费,但是进程挂掉了,来不及执行业务代码的情况。 修改消息确认消费机制,等到业务代码执行完成之后再向Kafka确认消息以消费。同时需要考虑业务代码的幂等性,保证消息重复消费的问题