企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] # 分析 采集需求:比如业务系统使用log4j生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到hdfs,使用agent串联 ![](https://box.kancloud.cn/88f4fb197886cbaf9c9220395918d357_932x270.png) 根据需求,首先定义以下3大要素 第一台flume agent * 采集源,即source——监控文件内容更新 : `exec 'tail -F file'` * 下沉目标,即sink——数据的发送者,实现序列化 : avro sink * Source和sink之间的传递通道——channel,可用file channel 也可以用 内存channel 第二台flume agent * 采集源,即source——接受数据。并实现反序列化 : avro source * 下沉目标,即sink——HDFS文件系统 : HDFS sink * Source和sink之间的传递通道——channel,可用file channel 也可以用 内存channel # 配置文件 第一台配置 Flume-agent1 ~~~ #tail-avro-avro-logger.conf # Name the components on this agent # 定义名称 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec # 监听这个文件 a1.sources.r1.command = tail -F /root/logs/test.log # Describe the sink ##sink端的avro是一个数据发送者 a1.sinks.k1.type = avro # 推给这个机器,自己定义 a1.sinks.k1.hostname = master # 端口 a1.sinks.k1.port = 41414 # 批量大小 a1.sinks.k1.batch-size = 10 # Use a channel which buffers events in memory # 内存channels a1.channels.c1.type = memory # 管道的容量,字节 a1.channels.c1.capacity = 1000 # 事务的类型,多少条之后source推送到channel或者channel推送到sinks a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel # 组装起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 ~~~ Flume-agent2: avro-hdfs.conf ~~~ a1.sources = r1 a1.sinks =s1 a1.channels = c1 ##source中的avro组件是一个接收者服务 a1.sources.r1.type = avro # 绑定一个ip和端口 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 41414 # 类型hdfs a1.sinks.s1.type=hdfs # hdfs目录 a1.sinks.s1.hdfs.path=hdfs://master:9000/flumedata # 文件的前缀,在hdfs上前缀 a1.sinks.s1.hdfs.filePrefix = access_log # 批次大小,就是文件达到多少条才提交到hdfs a1.sinks.s1.hdfs.batchSize= 100 # 当前文件存储数据类型,还可以用压缩格式 a1.sinks.s1.hdfs.fileType = DataStream # 文件的格式类型 a1.sinks.s1.hdfs.writeFormat =Text # 达到下面的三个任何一个就按照那个标准生成一个新文件 #滚动生成的文件按大小生成 agent1.sinks.sink1.hdfs.rollSize = 10240 #滚动生成的文件按行数生成 agent1.sinks.sink1.hdfs.rollCount = 1000 #滚动生成的文件按时间生成,秒 agent1.sinks.sink1.hdfs.rollInterval = 10 # 整体就是每10分钟滚动生成一个目录 #开启滚动生成目录 agent1.sinks.sink1.hdfs.round = true #以10为一梯度滚动生成,单位在下面 agent1.sinks.sink1.hdfs.roundValue = 10 #单位为分钟 agent1.sinks.sink1.hdfs.roundUnit = minute # 管道的类型 a1.channels.c1.type = memory # 管道的容量,字节 a1.channels.c1.capacity = 1000 # 事务的类型,多少条之后source推送到channel或者channel推送到sinks a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.s1.channel = c1 ~~~