ThinkChat🤖让你学习和工作更高效,注册即送10W Token,即刻开启你的AI之旅 广告
[TOC] # 为什么要有kafka stream spark和storm都是流式处理框架,而kafka stream提供的是一个基于kafka的流式处理类库.框架要求开发者按照特定的方式去开发逻辑部分,供框架调用.开发者很难了解框架的具体运行方式,从而使得调试成本高,并且使用受限. 而kafka stream作为流式处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者控制,方便使用和调试. 就流式处理系统而言,基本都支持kafka作为数据源.例如,storm具有专门的kafka-spout,而spark也提供专门的spark-streaming-kafka模块.事实上,kafka基本上是主流的流式处理系统的标准数据源,换而言之,大部分流式系统中都已部署了kafka,此时使用kafka stream的成本非常低 使用Storm或Spark Streaming时,需要为框架本身的进程预留资源,如Storm的supervisor和Spark on YARN的node manager。即使对于应用实例而言,框架本身也会占用部分资源,如Spark Streaming需要为shuffle和storage预留内存。但是Kafka作为类库不占用系统资源。 由于Kafka本身提供数据持久化,因此Kafka Stream提供滚动部署和滚动升级以及重新计算的能力。 由于Kafka Consumer Rebalance机制,Kafka Stream可以在线动态调整并行度 # 例子 **需求** 实时处理单词带有">>>"前缀的内容.例如输入"123>>>abc",最终处理成abc 需求分析 数据清洗案例 ![](https://box.kancloud.cn/07de74fcbf992bcc671732e6c07d8c19_821x168.png) **代码** 主类 ~~~ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; //泛型一般用序列化的字节数组不用string public class LogProcessor implements Processor<byte[], byte[]> { private ProcessorContext context; //初始化 @Override public void init(ProcessorContext processorContext) { this.context = processorContext; } //处理,实时处理,每条都调用这个 @Override public void process(byte[] bytes, byte[] bytes2) { //原来数据 String inputOri = new String(bytes2); //如果包含>>>则去除 if (inputOri.contains(">>>")) { inputOri = inputOri.split(">>>")[1].trim(); } //把数据写出去 context.forward(bytes, inputOri.getBytes()); } //周期性调用 @Override public void punctuate(long l) { } //释放资源 @Override public void close() { } } ~~~ 启动类 ~~~ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.TopologyBuilder; import java.util.Properties; public class Application { public static void main(String[] args) { //来自的topic String fromTopic = "test2"; //目的的topic String toTopic = "test3"; //设置参数 Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,slave1:9092,slave2:9092"); //实例化StreamConfig StreamsConfig config = new StreamsConfig(props); //构建拓扑 TopologyBuilder builder = new TopologyBuilder(); //定义kafka stream处理数据的来源,名字随便起 builder.addSource("SOURCE", fromTopic) .addProcessor("PROCESSOR", new ProcessorSupplier<byte[], byte[]>() { @Override public Processor get() { //我们定义的processor return new LogProcessor(); } }, "SOURCE") .addSink("SINK", toTopic, "PROCESSOR"); //根据StreamConfig对象以及用于构建拓补的Builder对象实例化kafka stream KafkaStreams streams = new KafkaStreams(builder, config); streams.start(); } } ~~~ **测试** 生产者 ~~~ kafka-console-producer.sh --broker-list master:9092.slave1:9092,slave2:9092 --topic test2 ~~~ 消费者 ~~~ kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic test3 ~~~