ThinkChat🤖让你学习和工作更高效,注册即送10W Token,即刻开启你的AI之旅 广告
[TOC] https://mp.weixin.qq.com/s/pTl8RofxRNA6-FjEACdD3g ## kafka 是什么?有什么作用? Kafka 是一个分布式的流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用 主要功能体现于三点: * **消息系统**:kafka与传统的消息中间件都具备**系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性**等功能。与此同时,kafka还提供了大多数消息系统难以实现的消息顺序性保障及回溯性消费的功能。 * **存储系统**:kafka把**消息持久化到磁盘**,相比于其他基于内存存储的系统而言,有效的降低了消息丢失的风险。这得益于其消息持久化和多副本机制。也可以将kafka作为长期的存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题日志压缩功能。 * **流式处理平台**:kafka为流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理框架,比如窗口、连接、变换和聚合等各类操作。 ## kafka 的架构是怎么样的? ![](https://img.kancloud.cn/ce/80/ce800a991b8326abb3bc8a0841e47edb_1080x658.png) Producer 将消息发送到 Broker,Broker 负责将受到的消息存储到磁盘中,而 Consumer 负责从 Broker 订阅并消费消息。 Kafka 基本概念: * **Producer**:生产者,负责将消息发送到 Broker * **Consumer**:消费者,从 Broker 接收消息 * **Consumer Group**:消费者组,由**多个 Consumer 组成**。消费者组内每个消费者负责消费不同分区的数据,**一个分区只能由一个组内消费者消费**;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。 * **Broker**:可以看做一个独立的**Kafka 服务节点或 Kafka 服务实例**。如果一台服务器上只部署了一个 Kafka 实例,那么我们也可以将 Broker 看做一台 Kafka 服务器。 * **Topic**:一个逻辑上的概念,包含很多 Partition,**同一个 Topic 下的 Partiton 的消息内容是不相同的**。 * **Partition**:为了实现扩展性,一个非常大的 topic**可以分布到多个 broker 上,一个 topic 可以分为多个 partition**,每个 partition 是一个有序的队列。 * **Replica**:副本,**同一分区的不同副本保存的是相同的消息**,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。 * **Leader**:每个分区的多个副本中的"主副本",**生产者以及消费者只与 Leader 交互**。 * **Follower**:每个分区的多个副本中的"从副本",**负责实时从 Leader 中同步数据,保持和 Leader 数据的同步**。Leader 发生故障时,从 Follower 副本中重新选举新的 Leader 副本对外提供服务。 ## Kafka Replicas是怎么管理的? ![](https://img.kancloud.cn/b0/fb/b0fb06d2465d7bfbcc264a261accd270_982x516.png) * AR:分区中的**所有 Replica 统称为 AR** * ISR:所有与 Leader 副本**保持一定程度同步**的Replica(包括 Leader 副本在内)组成 ISR * OSR:与 Leader 副本**同步滞后过多的**Replica 组成了 OSR Leader 负责维护和跟踪 ISR 集合中所有 Follower 副本的滞后状态,当 Follower 副本落后过多时,就会将其放入 OSR 集合,当 Follower 副本追上了 Leader 的进度时,就会将其放入 ISR 集合。 默认情况下,**只有 ISR 中的副本才有资格晋升为 Leader**。 ## 如何确定当前能读到哪一条消息? 分区相当于一个日志文件,我们先简单介绍几个概念 ![](https://img.kancloud.cn/83/a8/83a843b54e522bf5055ec6eb8f2db257_1080x525.png) 如上图是一个分区日志文件 * 标识**共有7条消息**,offset (消息偏移量)分别是0~6 * 0 代表这个日志文件的**开始** * HW(High Watermark) 为4,0~3 代表这个日志文件**可以消费的区间**,消费者只能消费到这四条消息 * LEO 代表即将要写入消息的偏移量 offset **分区 ISR 集合中的每个副本都会维护自己的 LEO,而 ISR 集合中最小的LEO 即为分区的 HW** ![](https://img.kancloud.cn/b9/a2/b9a20943ddba57d21151be058eb1cce8_1080x544.png) 如上图: 三个分区副本都是 ISR集合当中的,最小的 LEO 为 3,就代表分区的 HW 为3,所以当前分区只能消费到 0~2 之间的三条数据,如下图 ![](https://img.kancloud.cn/28/75/28756f2da636b0d442389e29f031c9a0_1080x536.png) ## 生产者发送消息有哪些模式? 总共有三种模式 * 1.**发后即忘**(fire-and-forget) * 它只管往 Kafka 里面发送消息,但是**不关心消息是否正确到达**,这种方式的效率最高,但是可靠性也最差,比如当发生某些不可充实异常的时候会造成消息的丢失 * 2.**同步**(sync) * producer.send()返回一个Future对象,调用get()方法变回进行同步等待,就知道消息是否发送成功,**发送一条消息需要等上个消息发送成功后才可以继续发送** * 3.**异步**(async) * Kafka支持 producer.send() 传入一个回调函数,消息不管成功或者失败都会调用这个回调函数,这样就算是异步发送,我们也知道消息的发送情况,然后再回调函数中选择记录日志还是重试都取决于调用方 * ## 发送消息的分区策略有哪些? ![](https://img.kancloud.cn/be/e7/bee728a0e9eca18ba4c550be25f9da96_1080x504.png) * 1.轮询:**依次**将消息发送该topic下的所有分区,如果在创建消息的时候 key 为 null,Kafka 默认采用这种策略。 * 2.key 指定分区:在创建消息是 key 不为空,并且使用默认分区器,Kafka 会将 key 进行 hash,然后**根据hash值映射到指定的分区上**。这样的好处是 key 相同的消息会在一个分区下,Kafka 并不能保证全局有序,但是在每个分区下的消息是有序的,按照顺序存储,按照顺序消费。在保证同一个 key 的消息是有序的,这样基本能满足消息的顺序性的需求。但是**如果 partation 数量发生变化,那就很难保证 key 与分区之间的映射关系了**。 * 3.自定义策略:实现 Partitioner 接口就能自定义分区策略。 * 4.指定 Partiton 发送 ## Kafka 支持读写分离吗?为什么? Kafka 是**不支持读写分离**的,那么读写分离的好处是什么?主要就是让一个节点去承担另一个节点的负载压力,也就是能做到一定程度的负载均衡,而且 Kafka 不通过读写分离也可以一定程度上去实现负载均衡。 ![](https://img.kancloud.cn/c3/99/c3991ef50b5c39aa32127afc19c26881_896x380.png) * 1.数据不一致的问题:读写分离必然涉及到数据的同步,只要是**不同节点之间的数据同步**,必然**会有数据不一致的问题**存在。 * 2.延时问题:由于 Kafka 独特的数据处理方式,导致如果将数据从一个节点同步到另一个节点必然会经过**主节点磁盘和从节点磁盘**,对一些延时性要求较高的应用来说,并不太适用 ## Kafka 是怎么去实现负载均衡的? Kafka 的负责均衡主要是**通过分区来实现**的,我们知道 Kafka 是**主写主读**的架构,如下图: ![](https://img.kancloud.cn/f7/70/f770b107ab76357514e493742648136b_1080x669.png) 共三个 broker ,里面各有三个副本,总共有三个 partation, 深色的是 leader,浅色的是 follower,上下灰色分别代表生产者和消费者,虚线代表 follower 从 leader 拉取消息。 我们从这张图就可以很明显的看出来,**每个 broker 都有消费者拉取消息,每个 broker 也都有生产者发送消息,每个 broker 上的读写负载都是一样的**,这也说明了 kafka 独特的架构方式可以通过主写主读来实现负载均衡。 ## Kafka 的负责均衡会有什么问题呢? kafka的负载均衡在绝对理想的状况下可以实现,但是会有某些情况出现一定程度上的负载不均衡 ![](https://img.kancloud.cn/6e/de/6ede3ec96995face34c6707ad4901b6b_1080x500.png) * 1.**broker 端分配不均**:当创建 topic 的时候可能会出现某些 broker 分配到的分区数多,而有些 broker 分配的分区少,这就导致了 leader 多副本不均。 * 2.**生产者写入消息不均**:生产者可能只对某些 broker 中的 leader 副本进行大量的写入操作,而对其他的 leader 副本不闻不问。 * 3.**消费者消费不均**:消费者可能只对某些 broker 中的 leader 副本进行大量的拉取操作,而对其他的 leader 副本不闻不问。 * 4.**leader 副本切换不均**:当主从副本切换或者分区副本进行了重分配后,可能会导致各个 broker 中的 leader 副本分配不均匀。 ## Kafka 的可靠性是怎么保证的? ![](https://img.kancloud.cn/77/83/77833e6371ec3bd367b39a0315d6ceb0_1080x448.png) **1.acks** 这个参数用来指定分区中有多少个副本收到这条消息,生产者才认为这条消息是写入成功的,这个参数有三个值: * 1.acks = 1,默认为1。生产者发送消息,**只要 leader 副本成功写入消息,就代表成功**。这种方案的问题在于,当返回成功后,如果 leader 副本和 follower 副本还**没有来得及同步**,leader 就崩溃了,那么在选举后新的 leader 就没有这条**消息,也就丢失了**。 * 2.acks = 0。生产者发送消息后直接算写入成功,不需要等待响应。这个方案的问题很明显,**只要服务端写消息时出现任何问题,都会导致消息丢失**。 * 3.acks = -1 或 acks = all。生产者发送消息后,需要等待 ISR 中的所有副本都成功写入消息后才能收到服务端的响应。毫无疑问这种方案的**可靠性是最高**的,但是如果 ISR 中只有leader 副本,那么就和 acks = 1 毫无差别了。 **2.消息发送的方式** 第6问中我们提到了生产者发送消息有三种方式,发完即忘,同步和异步。我们可以**通过同步或者异步**获取响应结果,**失败做重试**来保证消息的可靠性。 **3.手动提交位移** 默认情况下,当消费者消费到消息后,就会自动提交位移。但是如果消费者消费出错,没有进入真正的业务处理,那么就可能会导致这条消息消费失败,从而丢失。我们可以开启手动提交位移,等待业务正常处理完成后,再提交offset。 **4.通过副本 LEO 来确定分区 HW** ## Kafka 的消息消费方式有哪些? 一般消息消费有两种模式,推和拉。Kafka的消费是属于**拉模式**的,而此模式的消息消费方式有**两种,点对点和发布订阅**。 ![](https://img.kancloud.cn/6a/75/6a75b129095872c53f2b39e12d738989_1080x456.png) * 1.**点对点**:如果所有消费者属于同一个消费组,那么所有的消息都会被均匀的投递给每一个消费者,**每条消息只会被其中一个消费者消费**。 ![](https://img.kancloud.cn/19/29/19290f1dd687a6cc95266e8f58ff616b_1080x475.png) * 2.**发布订阅**:如果所有消费者属于不同的消费组,那么所有的消息都会被投递给每一个消费者,**每个消费者都会收到该消息**。 ## 副本 leader 是怎么选举的? 优先副本就是说在 AR 集合中的第一个副本。比如分区 2 的 AR 为 0,1,2,那么分区 2 的优先副本就为0。理想情况下优先副本就是 leader 副本。优先副本选举就是促使优先副本成为 leader 副本,从而维护集群的负载均衡。 ## 如何增强消费者的消费能力? * 1.可以考虑增加 topic 的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。 * 2.如果是消费者消费不及时,可以采用多线程的方式进行消费,并且优化业务方法流程,同样的分区数 ## kafka 控制器是什么?有什么作用 在 Kafka 集群中会有一个或多个 broker,其中有一个 broker 会被选举为控制器,**它负责管理整个集群中所有分区和副本的状态**,kafka 集群中**只能有一个控制器**。 * 当某个分区的 leader 副本出现故障时,由控制器负责**为该分区选举新的 leader 副本**。 * 当检测到某个分区的ISR集合发生变化时,由控制器负责**通知所有 broker 更新其元数据信息**。 * 当为某个 topic 增加分区数量时,由控制器**负责分区的重新分配**。 ## kafka 控制器是怎么进行选举的? 所谓控制器就是一个Borker,在一个kafka集群中,有多个broker节点,但是它们之间需要选举出一个leader,其他的broker充当follower角色。集群中第一个启动的broker会通过在zookeeper中创建临时节点`/controller`来让自己成为控制器,其他broker启动时也会在zookeeper中创建临时节点,但是发现节点已经存在,所以它们会收到一个异常,意识到控制器已经存在,那么就会在zookeeper中创建watch对象,便于它们收到控制器变更的通知。   那么如果控制器由于网络原因与zookeeper断开连接或者异常退出,那么其他broker通过watch收到控制器变更的通知,就会去尝试创建临时节点`/controller`,如果有一个broker创建成功,那么其他broker就会收到创建异常通知,也就意味着集群中已经有了控制器,其他broker只需创建watch对象即可。   如果集群中有一个broker发生异常退出了,那么控制器就会检查这个broker是否有分区的副本leader,如果有那么这个分区就需要一个新的leader,此时控制器就会去遍历其他副本,决定哪一个成为新的leader,同时更新分区的ISR集合。   如果有一个broker加入集群中,那么控制器就会通过Broker ID去判断新加入的broker中是否含有现有分区的副本,如果有,就会从分区副本中去同步数据。   集群中每选举一次控制器,就会通过zookeeper创建一个`controller epoch`,每一个选举都会创建一个更大,包含最新信息的`epoch`,如果有broker收到比这个`epoch`旧的数据,就会忽略它们,kafka也通过这个`epoch`来防止集群产生“脑裂”。 ## kafka 为什么这么快? ![](https://img.kancloud.cn/37/fa/37fae61bd448a4d9824bf5e26ccbc545_1080x543.png) * **1.顺序读写** 磁盘分为顺序读写与随机读写,基于磁盘的随机读写确实很慢,但磁盘的顺序读写性能却很高,kafka 这里采用的就是顺序读写。 * **2.Page Cache** 为了优化读写性能,Kafka 利用了操作**系统本身的 Page Cache**,就是利用操作系统自身的内存而不是JVM空间内存。 * **3.零拷贝** Kafka使用了零拷贝技术,也就是**直接将数据从内核空间的读缓冲区直接拷贝到内核空间的 socket 缓冲区**,然后再写入到 NIC 缓冲区,避免了在内核空间和用户空间之间穿梭。 * **4.分区分段+索引** Kafka 的 message 是按 topic分 类存储的,topic 中的数据又是按照一个一个的 partition 即分区存储到不同 broker 节点。每个 partition 对应了操作系统上的一个文件夹,partition 实际上又是按照segment分段存储的。 通过这种分区分段的设计,Kafka 的 message 消息实际上是分布式存储在一个一个小的 segment 中的,每次文件操作也是直接操作的 segment。为了进一步的查询优化,Kafka 又默认为分段后的数据文件建立了索引文件,就是文件系统上的.index文件。这种分区分段+索引的设计,不仅提升了数据读取的效率,同时也提高了数据操作的并行度。 * **5.批量读写** Kafka**数据读写也是批量的而不是单条的**,这样可以避免在网络上频繁传输单个消息带来的延迟和带宽开销。假设网络带宽为10MB/S,一次性传输10MB的消息比传输1KB的消息10000万次显然要快得多。 * **6.批量压缩** Kafka 把所有的消息都变成一个**批量的文件**,并且进行合理的**批量压缩**,减少网络 IO 损耗,通过 mmap 提高 I/O 速度,写入数据的时候由于单个Partion是末尾添加所以速度最优;读取数据的时候配合 sendfile 进行直接读取。 ## 什么情况下 kafka 会丢失消息? Kafka 有三次消息传递的过程:生产者发消息给 Broker,Broker 同步消息和持久化消息,Broker 将消息传递给消费者。 ![](https://img.kancloud.cn/ec/a6/eca6d1ac572541cf2527cdd7e516e5b3_1080x437.png) * 1.生产者发送数据: 在第 11 问中的 acks中有说到 * 当 acks 为 0,**只要服务端写消息时出现任何问题,都会导致消息丢失**。 * 当 acks 配置为 1 时,生产者发送消息,只要 leader 副本成功写入消息,就代表成功。这种方案的问题在于,当返回成功后,**如果 leader 副本和 follower 副本还没有来得及同步,leader 就崩溃了,那么在选举后新的 leader 就没有这条消息,也就丢失了**。 * 2.Broker 存储数据:kafka 通过 Page Cache 将数据写入磁盘。 * Page Cache 就是当往磁盘文件写入的时候,系统会先将数据流写入缓存中,但是**什么时候将缓存的数据写入文件中是由操作系统自行决定**。所以**如果此时机器突然挂了,也是会丢失消息的**。 * 3.消费者消费数据:在开启**自动提交 offset**时,只要消费者消费到消息,那么就会自动提交偏移量,**如果业务还没有来得及处理,那么消息就会丢失**。