多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
[TOC] # 简介 Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。Flume有各种自带的拦截器,比如:TimestampInterceptor(时间戳)、HostInterceptor(主机)、RegexExtractorInterceptor(正则)等,通过使用不同的拦截器,实现不同的功能。但是以上的这些拦截器,不能改变原有日志数据的内容或者对日志信息添加一定的处理逻辑,当一条日志信息有几十个甚至上百个字段的时候,在传统的Flume处理下,收集到的日志还是会有对应这么多的字段,也不能对你想要的字段进行对应的处理 # 自定义拦截器 根据实际业务的需求,为了更好的满足数据在应用层的处理,通过自定义Flume拦截器,过滤掉不需要的字段,并对指定字段加密处理,将源数据进行预处理。减少了数据的传输量,降低了存储的开销 # 需求 ~~~ 13601249301 100 200 300 400 500 600 700 13601249302 100 200 300 400 500 600 700 13601249303 100 200 300 400 500 600 700 13601249304 100 200 300 400 500 600 700 13601249305 100 200 300 400 500 600 700 13601249306 100 200 300 400 500 600 700 13601249307 100 200 300 400 500 600 700 13601249308 100 200 300 400 500 600 700 13601249309 100 200 300 400 500 600 700 13601249310 100 200 300 400 500 600 700 13601249311 100 200 300 400 500 600 700 13601249312 100 200 300 400 500 600 700 ~~~ 把这个变成这个样子 第一行加密,中间有几行舍弃 ![](https://box.kancloud.cn/9a73270b2419e629800c1ed8bf4dae99_1436x190.png) # 实现 二部分 ## 编写java代码,自定义拦截器; 内容包括: 1. 定义一个类CustomParameterInterceptor实现Interceptor接口。 2. 在CustomParameterInterceptor类中定义变量,这些变量是需要到 Flume的配置文件中进行配置使用的。每一行字段间的分隔符(fields_separator)、通过分隔符分隔后,所需要列字段的下标(indexs)、多个下标使用的分隔符(indexs_separator)、多个下标使用的分隔符(indexs_separator)。 3. 添加CustomParameterInterceptor的有参构造方法。并对相应的变量进行处理。将配置文件中传过来的unicode编码进行转换为字符串。 4. 写具体的要处理的逻辑intercept()方法,一个是单个处理的,一个是批量处理。 5. 接口中定义了一个内部接口Builder,在configure方法中,进行一些参数配置。并给出,在flume的conf中没配置一些参数时,给出其默认值。通过其builder方法,返回一个CustomParameterInterceptor对象。 6. 定义一个静态类,类中封装MD5加密方法 7. 通过以上步骤,自定义拦截器的代码开发已完成,然后打包成jar, 放到Flume的根目录下的lib中 ![](https://box.kancloud.cn/4f2e68edbe218f81cdd341ee91d57df7_522x604.png) ## 修改Flume的配置信息 新增配置文件spool-interceptor-hdfs.conf,内容为: ~~~ a1.channels = c1 a1.sources = r1 a1.sinks = s1 #channel a1.channels.c1.type = memory a1.channels.c1.capacity=100000 a1.channels.c1.transactionCapacity=50000 #source a1.sources.r1.channels = c1 # 监控/root/data/下的文件 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /root/data/ a1.sources.r1.batchSize= 50 a1.sources.r1.inputCharset = UTF-8 # 拦截器 a1.sources.r1.interceptors =i1 i2 # 自己定义的java类, $表示内部类 a1.sources.r1.interceptors.i1.type =com.hive.CustomParameterInterceptor$Builder # 下面的定义属性会传递到自定义的类中 # 自定义拦截器的属性,这个是代表分隔符,unicode编码的空格 a1.sources.r1.interceptors.i1.fields_separator=\\u0009 # 当前列我需要取哪些列的下标 a1.sources.r1.interceptors.i1.indexs =0,1,3,5,6 # 当前索引用的什么分隔符,用的是逗号,写unicode编码 a1.sources.r1.interceptors.i1.indexs_separator =\\u002c # 具体加密字段的参数,第一列加密 a1.sources.r1.interceptors.i1.encrypted_field_index =0 # 这个拦截器类型是时间戳 a1.sources.r1.interceptors.i2.type = timestamp #sink a1.sinks.s1.channel = c1 a1.sinks.s1.type = hdfs a1.sinks.s1.hdfs.path =hdfs://master:9000/flume/%Y%m%d # 当前类hdfs里面的属性值 a1.sinks.s1.hdfs.filePrefix = event a1.sinks.s1.hdfs.fileSuffix = .log a1.sinks.s1.hdfs.rollSize = 10485760 a1.sinks.s1.hdfs.rollInterval =20 a1.sinks.s1.hdfs.rollCount = 0 a1.sinks.s1.hdfs.batchSize = 1500 a1.sinks.s1.hdfs.round = true a1.sinks.s1.hdfs.roundUnit = minute a1.sinks.s1.hdfs.threadsPoolSize = 25 a1.sinks.s1.hdfs.useLocalTimeStamp = true a1.sinks.s1.hdfs.minBlockReplicas = 1 a1.sinks.s1.hdfs.fileType =DataStream a1.sinks.s1.hdfs.writeFormat = Text a1.sinks.s1.hdfs.callTimeout = 60000 a1.sinks.s1.hdfs.idleTimeout =60 ~~~ # 代码 先看内部Builder类,里面有configure方法 字段的默认值有CustomParameterInterceptor.Constants这个内部类提供 Builder类里面的builder是构造拦截器,用里面的类来构建 先调用这个类的构造函数 然后`List<Event> intercept`会调用当个intercept ~~~ package com.hive; import com.google.common.base.Charsets; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; import static com.hive.CustomParameterInterceptor.Constants.*; public class CustomParameterInterceptor implements Interceptor { //指明每一行字段的分隔符 private final String fields_separator; //通过分割符分割后,指明需要那列的字段,下标 private final String indexs; //多个下标的分割符 private final String indexs_separator; //需要加密的字段下标 private final String encrypted_field_index; public CustomParameterInterceptor(String fields_separator, String indexs, String indexs_separator, String encrypted_field_index) { //每一行字段的分隔符 String f = fields_separator.trim(); //多个下标的分割符 String i = indexs_separator.trim(); //通过分割符分割后,指明需要那列的字段,下标 this.indexs = indexs; //需要加密的字段下标 this.encrypted_field_index = encrypted_field_index.trim(); if (!f.equals("")) { f = UnicodeToString(f); } //指明每一行字段的分隔符 this.fields_separator = f; if (!i.equals("")) { i = UnicodeToString(i); } //多个下标的分割符 this.indexs_separator = i; } /** * unicode转换为string * \t 制表符 ('\u0009') \n 新行(换行)符 (' ') \r 回车符 (' ') \f 换页符 ('\u000C') \a 报警 * (bell) 符 ('\u0007') \e 转义符 ('\u001B') \cx 空格(\u0020)对应于 x 的控制符 */ private String UnicodeToString(String str) { Pattern pattern = Pattern.compile("(\\\\u(\\p{XDigit}{4}))"); Matcher matcher = pattern.matcher(str); char ch; while (matcher.find()) { ch = (char) Integer.parseInt(matcher.group(2), 16); str = str.replace(matcher.group(1), ch + ""); } return str; } //初始化方法 @Override public void initialize() {} //处理单条事件 @Override public Event intercept(Event event) { if (event == null) { return null; } try { //getBody是具体存放数据的内容,获取每一行数据 String line = new String(event.getBody(), Charsets.UTF_8); //分隔符切分 String[] fields_spilts = line.split(fields_separator); //对应所需要的列的下标 String[] indexs_split = indexs.split(indexs_separator); String newLine = ""; //循环下标数组 for (int i = 0; i < indexs_split.length; i++) { int parseInt = Integer.parseInt(indexs_split[i]); //对字段进行加密 if (!"".equals(encrypted_field_index) && encrypted_field_index.equals(indexs_split[i])) { //将数据最终加密为md5 newLine += StringUtils.GetMD5Code(fields_spilts[parseInt]); } else { newLine += fields_spilts[parseInt]; } //拼接字段分割符 if (i != indexs_split.length - 1) { newLine += fields_separator; } } //把这个新的一行数据设置进去 event.setBody(newLine.getBytes()); return event; } catch (Exception e) { return event; } } //处理很多事件 @Override public List<Event> intercept(List<Event> events) { ArrayList<Event> out = new ArrayList<Event>(); //循环这个时间列表 for (Event event : events) { //调用单个事件 Event outEvent = intercept(event); if (outEvent != null) { out.add(outEvent); } } return out; } @Override public void close() { } //这个Interceptor.Builder可以读取flume的配置文件,因为Builder中继承的Configurable可以读取配置文件 public static class Builder implements Interceptor.Builder { /** * The fields_separator.指明每一行字段的分隔符 */ private String fields_separator; /** * The indexs.通过分隔符分割后,指明需要那列的字段 下标 */ private String indexs; /** * The indexs_separator. 多个下标的分隔符 */ private String indexs_separator; /** * The encrypted_field. 需要加密的字段下标 */ private String encrypted_field_index; //构建对应的拦截器 @Override public Interceptor build() { //用上面一个类来构建,返回外面的那个类,把值传递给他的构造器 return new CustomParameterInterceptor(fields_separator, indexs, indexs_separator, encrypted_field_index); } //能够帮我们获取配置文件定义的参数 @Override public void configure(Context context) { //后面的值是默认值,context可以获取到配置文件的值,在这里面用大写常量代替,具体的值可参考下面的类Constants fields_separator = context.getString(FIELD_SEPARATOR, DEFAULT_FIELD_SEPARATOR); indexs = context.getString(INDEXS, DEFAULT_INDEXS); indexs_separator = context.getString(INDEXS_SEPARATOR, DEFAULT_INDEXS_SEPARATOR); encrypted_field_index = context.getString(ENCRYPTED_FIELD_INDEX, DEFAULT_ENCRYPTED_FIELD_INDEX); } } public static class Constants { /** * The Constant FIELD_SEPARATOR. */ public static final String FIELD_SEPARATOR = "fields_separator"; /** * The Constant DEFAULT_FIELD_SEPARATOR. */ public static final String DEFAULT_FIELD_SEPARATOR = " "; /** * The Constant INDEXS. */ public static final String INDEXS = "indexs"; /** * The Constant DEFAULT_INDEXS. */ public static final String DEFAULT_INDEXS = "0"; /** * The Constant INDEXS_SEPARATOR. */ public static final String INDEXS_SEPARATOR = "indexs_separator"; /** * The Constant DEFAULT_INDEXS_SEPARATOR. */ public static final String DEFAULT_INDEXS_SEPARATOR = ","; /** * The Constant ENCRYPTED_FIELD_INDEX. */ public static final String ENCRYPTED_FIELD_INDEX = "encrypted_field_index"; /** * The Constant DEFAUL_TENCRYPTED_FIELD_INDEX. */ public static final String DEFAULT_ENCRYPTED_FIELD_INDEX = ""; /** * The Constant PROCESSTIME. */ public static final String PROCESSTIME = "processTime"; /** * The Constant PROCESSTIME. */ public static final String DEFAULT_PROCESSTIME = "a"; } /** * 字符串md5加密 */ public static class StringUtils { // 全局数组 private final static String[] strDigits = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"}; // 返回形式为数字跟字符串 private static String byteToArrayString(byte bByte) { int iRet = bByte; // System.out.println("iRet="+iRet); if (iRet < 0) { iRet += 256; } int iD1 = iRet / 16; int iD2 = iRet % 16; return strDigits[iD1] + strDigits[iD2]; } // 返回形式只为数字 private static String byteToNum(byte bByte) { int iRet = bByte; System.out.println("iRet1=" + iRet); if (iRet < 0) { iRet += 256; } return String.valueOf(iRet); } // 转换字节数组为16进制字串 private static String byteToString(byte[] bByte) { StringBuffer sBuffer = new StringBuffer(); for (int i = 0; i < bByte.length; i++) { sBuffer.append(byteToArrayString(bByte[i])); } return sBuffer.toString(); } public static String GetMD5Code(String strObj) { String resultString = null; try { resultString = new String(strObj); MessageDigest md = MessageDigest.getInstance("MD5"); // md.digest() 该函数返回值为存放哈希值结果的byte数组 resultString = byteToString(md.digest(strObj.getBytes())); } catch (NoSuchAlgorithmException ex) { ex.printStackTrace(); } return resultString; } } } ~~~ 然后把这个jar包上传到hive的lib目录 启动: ~~~ flume-ng agent -c conf -f conf/spool-interceptor-hdfs.conf -name a1 -Dflume.root.logger=DEBUG,console ~~~