💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
[TOC] # 消费模型 消息是由生产者发布到kafka集群后,会被消费者消费.消息的消费模型有两种:推送模型(push)和拉取模型(pull). 基于推送模型(push)的消息系统,由消息代理记录消费者的消费状态.消息代理在将消息推送到消费者后,标记这条消息已经消费,但这种方式无法很好保证消息被处理.比如,消息代理把消息发送出去后,当消费进程挂掉或者由于网络原因没有收到这条消息时,就有可能造成消息丢失(因为消息代理已经把这条消息标记为已消费了,但实际上这条消息并没有被实际处理).如果要保证消息被处理,消息代理发送完消息后,要设置状态为"已发送".只有收到消费者的确认请求后才更新为"已消费",这就需要消息代理中记录所有的消费状态,这种做法显然是不可取的. kafka采用拉取模型,由消费者自己记录消费状态,每个消费者互相独立的顺序读取每个分区的消息. 如下图所示,有两个消费者(不同消费者组)拉取同一个主题的消息,消费者A的消费进度是3,消费者B的消费进度是6.消费者拉取的最大上限通过最高水位(watermark)控制,生产者最新写入的消息如果还没有达到备份数量,对消费者是不可见的.这种由消费者控制偏移量的优点是:消费者可以按照任意的顺序消费消息. 比如,消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息.或者直接跳到最近的位置,从当前的时刻开始消费. ![](https://box.kancloud.cn/6fb9dd2f5a3d96b307979aeb5ca9b08d_743x236.png) 在一些消息系统中,消息代理会在消息被消费之后立即删除消息.如果有不同类型的消费者订阅同有一个主题,消息代理可能需要冗余地存储同一消息.或者等所有消费者都消费完才删除,这就需要消息代理跟踪每个消费者的消费状态,这种设计很大程度上限制了消息系统的整体吞吐量和处理延迟.kafka的做法是生产者发布的所有消息会一致保存在kafka集群中,不管消息有没有被消费.用户可以通过设置保留时间来清理过期的数据.比如,设置保留策略为两天.那么,在消息发布后,它可以被不同的消费者消费,在两天之后,过期的消息就会自动清理掉 # 高级API **优点** 高级API写起来简单 **不需要自行去管理offset,系统通过zookeeper自行管理** **不需要管理分区,副本等情况.系统自动管理.** 消费者断线会自动根据上一次记录在zookeeper中的offset去接着获取数据(默认设置1分钟更新一下zookeeper中存的offset) 可以使用group来区分对同一个topic的不同程序范围内离开来(不同的group记录不同的offset,这样不同程序读取同一个topic才不会因为offset互相影响). **缺点** **不能自行控制offset**(对于某些特殊需求来说) 不能细化控制如分区,副本,zk等 # 低级API **优点** **能够让开发者自己控制offset,想从哪里读取就从哪里读取.** 自行控制连接分区,对分区自定义进行负载均衡 对zookeeper的依赖性降低(如:offset不一定非要靠zk存储,自行存储offset即可,比如存在文件或者内存中) **缺点** **太过复杂**,需要自行控制offset,连接那个分区,找到分区leader等 # 消费者组 ![](https://box.kancloud.cn/ba1a4b29dd8e2c68c8953ac160f4f370_718x270.png) 消费者是以consumer group消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个topic.每个分区在同一时间只能由group中的一个消费者读取,但是多个group可以同时消费这个partition. 在图中,有一个由三个消费者组成的group,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区.某个消费者读取某个分区,也可以叫做某个消费者是某个分区的拥有者 在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息.另外,如果一个消费者失败了,那么其他的group成员会自动负载均衡之前失败的消费者读取的分区 # 消费方式 consumer采用pull(拉)模式从broker中读取数据. push(推)模式很难适应消费速率不同的消费者,因为消费发送速率是由broker决定的. 它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞.而pull模式可以根据consumer的消费能力以适当的速率消费消息. 对于kafka而言,pull模式更合适,它可以简化broker的设计,consumer可自主控制消费消息的速率,同时consumer可以自己控制消费方式-即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义. pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直等待数据到达.为了避免这种情况,我们在拉请求中有参数,允许消费者请求在等待数据到达的"长轮询"中进行阻塞(并且可选的等待到给定的字节数,以确保大的传输大小). # 消费组案例 需求:测试同一个消费者组中的消费者,同一时刻只能有一个消费者消费。 1. 在hadoop102、hadoop103上修改/opt/module/kafka/config/consumer.properties配置文件中的group.id属性为任意组名。 ~~~ $ vi consumer.properties group.id=console ~~~ 2. 在master,slave1,slave2上分别启动消费者 ~~~ $ bin/kafka-console-consumer.sh --zookeeper master:2181 --topic first --consumer.config config/consumer.properties $ bin/kafka-console-consumer.sh --zookeeper master:2181 --topic first --consumer.config config/consumer.properties ~~~ 3. 在hadoop104上启动生产者 ~~~ $ bin/kafka-console-producer.sh --broker-list master:9092 --topic first >hello world ~~~ 4. 查看hadoop102和hadoop103的接收者 同一时刻只有一个消费者接收到消息 # Consumer和ConsumerGroup ![](https://box.kancloud.cn/269d0a648b8ac08135252b70c7938c06_1828x734.png) 1. consumerGroup之间消费的数据互不干扰 2. 每个consumer对应一个或多个分片,一个分片只能属于一个consumer 3. 多于分片数的consumer消费者,是没有分片的数据可以被消费的 4. 多于consumer只能得到其中一个或多个consumer挂掉的时候才能有机会消费到数据.这个过程叫做rebalnace