多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
[TOC] # 高级API ~~~ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; //消费者 public class CustomConsumer { public static void main(String[] args) { Properties props = new Properties(); //定义kafka服务器的地址,不需要将所有broker指定上 props.put("bootstrap.servers", "master:9092"); //制定consumer group props.put("group.id", "g1"); //是否自动确认offset props.put("enable.auto.commit", "true"); //自动确认offset的时间间隔,每隔多少时间确认offset是否正确 props.put("auto.commit.interval.ms", "1000"); // key的反序列化类 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的反序列化类 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 定义consumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //消费者订阅的topic,可同时订阅多个 consumer.subscribe(Arrays.asList("test1")); while (true) { //读取数据,读取超时时间为100ms,100ms滚动一次 ConsumerRecords<String, String> records = consumer.poll(100); //消息不是一条一条的传送的,上面定义的是100ms读取一堆数据,那这些数据要循环 for (ConsumerRecord<String, String> record : records) { System.out.printf("Topic = %s, PartitionId = %d, offset = %d, key = %s, value = %s%n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } } } ~~~ 用多线程方式创建出来 ~~~ import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class KafkaConsumerSimple implements Runnable { public String title; public KafkaStream<byte[], byte[]> stream; public KafkaConsumerSimple(String title, KafkaStream<byte[], byte[]> stream) { //获取自己的消费编号,以及要消耗的kafkaStream this.title = title; this.stream = stream; } @Override public void run() { System.out.println("开始运行 " + title); //从kafkaStream中获取一个迭代器 ConsumerIterator<byte[], byte[]> it = stream.iterator(); /** * 不停地从stream读取新到来的消息,在等待新的消息时,hasNext()会阻塞 * 如果调用 `ConsumerConnector#shutdown`,那么`hasNext`会返回false * */ while (it.hasNext()) { MessageAndMetadata<byte[], byte[]> data = it.next(); String topic = data.topic(); int partition = data.partition(); long offset = data.offset(); String msg = new String(data.message()); System.out.println(String.format( "Consumer: [%s], Topic: [%s], PartitionId: [%d], Offset: [%d], msg: [%s]", title, topic, partition, offset, msg)); } System.err.println(String.format("Consumer: [%s] exiting ...", title)); } public static void main(String[] args) throws Exception { // 准备些配置参数 Properties props = new Properties(); props.put("group.id", "testGroup"); props.put("zookeeper.connect", "master:2181,slave1:2181,slave2:2181"); //从最新的开始消费 props.put("auto.offset.reset", "largest"); props.put("auto.commit.interval.ms", "1000"); props.put("partition.assignment.strategy", "roundrobin"); ConsumerConfig config = new ConsumerConfig(props); //准备要消耗的topic String topic = "order"; //创建一个consumer的连接器 //只要ConsumerConnector还在的话,consumer会一直等待新消息,不会自己退出 ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config); //创建topicCountMap Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); //创建几个kafkaStream,分片数 topicCountMap.put(topic, 1); //获取每个topic对应的kafkaStream //Map的key就是topic,value就是kafkaStream Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap); //取出对应的 streams List<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic); //创建一个容量为4的线程池 ExecutorService executor = Executors.newFixedThreadPool(4); //创建20个consumer threads for (int i = 0; i < streams.size(); i++) executor.execute(new KafkaConsumerSimple("消费者" + (i + 1), streams.get(i))); } } ~~~ # 低级API 实现使用低级API读取topic,指定partition,指定offset的数据 消费者使用低级API的主要步骤 | 步骤 | 主要工作 | |---|---| | 1 | 根据指定的分区从主题元数据中找到主副本 | | 2 | 获取分区最新消费进度 | | 3 | 从主副本拉取分区的消息 | | 4 | 识别主副本变化,重试 | 方法描述 | 方法 | 描述 | |---|---| | findLeader() | 客户端向种子节点发送主题元数据,将副本集加入备用节点 | | getLastOffset() | 消费者客户端发送偏移量请求,获取分区最近的偏移量 | | run() | 消费者低级API拉取消息的主要方法 | | findNewLeader() | 当分区的主副本节点发送故障,客户将要找出新的主副本 | ~~~ import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.javaapi.*; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.MessageAndOffset; import java.util.ArrayList; import java.util.Collections; import java.util.List; public class MySimpleConsumer { //消费指定主题,指定分区,指定偏移量数据 public static void main(String[] args) { //kafka集群 ArrayList<String> brokers = new ArrayList<>(); brokers.add("master"); brokers.add("slave1"); brokers.add("slave2"); //端口号 int port = 9092; //主题 String topic = "second"; //分区号 int partition = 0; //偏移量 long offset = 15; MySimpleConsumer mySimpleConsumer = new MySimpleConsumer(); mySimpleConsumer.getData(brokers, port, topic, partition, offset); } public void getData(List<String> brokers, int port, String topic, int partition, long offset) { PartitionMetadata partitionMetadata = getLeader(brokers, port, topic, partition); //获取指定分区的leader(String) String leader = partitionMetadata.leader().host(); //获取consumer对象 SimpleConsumer consumer = new SimpleConsumer(leader, port, 1000, 1024 * 4, "client" + topic); //拉取数据请求 FetchRequest fetchRequest = new FetchRequestBuilder().clientId("client" + topic).addFetch(topic, partition, offset, 1000).build(); //拉取数据 FetchResponse fetchResponse = consumer.fetch(fetchRequest); ByteBufferMessageSet messageAndOffsets = fetchResponse.messageSet(topic, partition); //打印数据信息 for (MessageAndOffset messageAndOffset : messageAndOffsets) { String s = String.valueOf(messageAndOffset.message().payload().get()); System.out.println("offset:" + messageAndOffset.offset() + "---" + s); } consumer.close(); } //获取分区leader public PartitionMetadata getLeader(List<String> brokers, int port, String topic, int partition) { SimpleConsumer consumer = null; for (String broker : brokers) { consumer = new SimpleConsumer(broker, port, 1000, 1024 * 4, "client"); TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(Collections.singletonList(topic)); //获取topic元数据信息 TopicMetadataResponse topicMetadataResponse = consumer.send(topicMetadataRequest); List<TopicMetadata> topicMetadata = topicMetadataResponse.topicsMetadata(); for (TopicMetadata topicMetadatum : topicMetadata) { //获取一个topic中所有的分区元数据信息 List<PartitionMetadata> partitionMetadata = topicMetadatum.partitionsMetadata(); for (PartitionMetadata partitionMetadatum : partitionMetadata) { if (partitionMetadatum.partitionId() == partition) { consumer.close(); return partitionMetadatum; } } } } assert consumer != null; consumer.close(); return null; } } ~~~ 添加查找新leader等功能 ~~~ import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.javaapi.*; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.MessageAndOffset; import java.util.ArrayList; import java.util.Collections; import java.util.List; public class MySimpleConsumer { //消费指定主题,指定分区,指定偏移量数据 public static void main(String[] args) { //kafka集群 ArrayList<String> brokers = new ArrayList<>(); brokers.add("192.168.9.102"); brokers.add("192.168.9.103"); brokers.add("192.168.9.104"); //端口号 int port = 9092; //主题 String topic = "second"; //分区号 int partition = 0; //偏移量 long offset = 15; MySimpleConsumer mySimpleConsumer = new MySimpleConsumer(); mySimpleConsumer.getData(brokers, port, topic, partition, offset); } public void getData(List<String> brokers, int port, String topic, int partition, long offset) { PartitionMetadata partitionMetadata = getLeader(brokers, port, topic, partition); //获取指定分区的leader(String) String leader = partitionMetadata.leader().host(); //获取consumer对象 SimpleConsumer consumer = new SimpleConsumer(leader, port, 1000, 1024 * 4, "client" + topic); //拉取数据请求 FetchRequest fetchRequest = new FetchRequestBuilder().clientId("client" + topic).addFetch(topic, partition, offset, 1000).build(); //拉取数据 FetchResponse fetchResponse = consumer.fetch(fetchRequest); ByteBufferMessageSet messageAndOffsets = fetchResponse.messageSet(topic, partition); //打印数据信息 for (MessageAndOffset messageAndOffset : messageAndOffsets) { String s = String.valueOf(messageAndOffset.message().payload().get()); System.out.println("offset:" + messageAndOffset.offset() + "---" + s); } consumer.close(); } //获取分区leader public PartitionMetadata getLeader(List<String> brokers, int port, String topic, int partition) { SimpleConsumer consumer = null; for (String broker : brokers) { consumer = new SimpleConsumer(broker, port, 1000, 1024 * 4, "client"); TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(Collections.singletonList(topic)); //获取topic元数据信息 TopicMetadataResponse topicMetadataResponse = consumer.send(topicMetadataRequest); List<TopicMetadata> topicMetadata = topicMetadataResponse.topicsMetadata(); for (TopicMetadata topicMetadatum : topicMetadata) { //获取一个topic中所有的分区元数据信息 List<PartitionMetadata> partitionMetadata = topicMetadatum.partitionsMetadata(); for (PartitionMetadata partitionMetadatum : partitionMetadata) { if (partitionMetadatum.partitionId() == partition) { consumer.close(); return partitionMetadatum; } } } } assert consumer != null; consumer.close(); return null; } } ~~~