💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
[TOC] # 介绍 拦截器是简单的插件式组件,设置在source和channel之间。source接收到的事件event,在写入channel之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个source接收到的事件。可以自定义拦截器 # flume内置的拦截器 ## 时间戳拦截器 flume中一个最经常使用的拦截器 ,该拦截器的作用是将时间戳插入到flume的事件报头中。如果不使用任何拦截器,flume接受到的只有message。时间戳拦截器的配置: ![](https://box.kancloud.cn/bd7dc87161ac8e6db59b6bcb0e4c7f0a_700x343.png) source连接到时间戳拦截器的配置: ~~~ a1.sources.r1.interceptors=i1 a1.sources.r1.interceptors.i1.type=timestamp a1.sources.r1.interceptors.i1.preserveExisting=false ~~~ 事件打印出来 ~~~ 18/07/13 01:08:44 INFO sink.LoggerSink: Event: { headers:{timestamp=1531415324910} body: 68 65 6C 6C 6F 0D hello. } ~~~ 可以看到headers中有添加时间戳 一般情况headers里面是空的 ## 主机拦截器 主机拦截器插入服务器的ip地址或者主机名,agent将这些内容插入到事件的报头中。事件报头中的key使用hostHeader配置,默认是host。主机拦截器的配置: ![](https://box.kancloud.cn/1ca1a6226c9c1bea011b001bbdf3592f_698x411.png) source连接到主机拦截器的配置: ~~~ a1.sources.r1.interceptors=i2 a1.sources.r1.interceptors.i2.type=host a1.sources.r1.interceptors.i2.useIP=false a1.sources.r1.interceptors.i2.preserveExisting=false ~~~ ## 静态拦截器 静态拦截器的作用是将k/v插入到事件的报头中。配置如下 ![](https://box.kancloud.cn/b39173374252bc5040ff126caff8856b_695x409.png) source连接到静态拦截器的配置: ~~~ a1.sources.r1.interceptors= i3 a1.sources.r1.interceptors.static.type=static # 自己指定k/v a1.sources.r1.interceptors.static.key=logs a1.sources.r1.interceptors.static.value=logFlume a1.sources.r1.interceptors.static.preserveExisting=false ~~~ ## 正则过滤拦截器 在日志采集的时候,可能有一些数据是我们不需要的,这样添加过滤拦截器,可以过滤掉不需要的日志,也可以根据需要收集满足正则条件的日志。配置如下 ![](https://box.kancloud.cn/b857f5db3638753c351a4be89fad5979_681x411.png) source连接到正则过滤拦截器的配置: ~~~ a1.sources.r1.interceptors=i4 a1.sources.r1.interceptors.i4.type=REGEX_FILTER # 保留内容中出现rm或者kill的字符串的记录 a1.sources.r1.interceptors.i4.regex=(rm)|(kill) a1.sources.r1.interceptors.i4.excludeEvents=false ~~~ 这样配置的拦截器就只会接收日志消息中带有rm 或者kill的日志。 测试案例: test_regex.conf ~~~ # 定义这个agent中各组件的名字 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 描述和配置source组件:r1 a1.sources.r1.type = netcat a1.sources.r1.bind = master a1.sources.r1.port = 44444 a1.sources.r1.interceptors=i4 a1.sources.r1.interceptors.i4.type=REGEX_FILTER #保留内容中出现hadoop或者是spark的字符串的记录 a1.sources.r1.interceptors.i4.regex=(hadoop)|(spark) a1.sources.r1.interceptors.i4.excludeEvents=false # 描述和配置sink组件:k1 # 输出到控制台上 a1.sinks.k1.type = logger # 描述和配置channel组件,此处使用是内存缓存的方式 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 描述和配置source channel sink之间的连接关系 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 ~~~ 只接受存在hadoop或者spark记录