ThinkChat🤖让你学习和工作更高效,注册即送10W Token,即刻开启你的AI之旅 广告
[TOC] # 简介 MapReduce任务的输入文件一般是存储在HDFS里面.输入的文件格式包括:基于行的日志文件,二进制格式文件.这些文件一般很大,达到数十G,甚至更大,那么MapReduce是如何读取这些数据的? InputFormat常见的接口实现类包括: TextInputFormat, KeyValueTextInputFormat, NLineInputFormat, CombineTextInputFormat和自定义InputFormat等 # TextInputFormat TextInputFormat是默认的InputFormat.每条记录是一行输入. 键是LongWritable类型,存储该行在整个文件中的字节偏移量,值是这行的内容,不包括任何行终止符(换行符和回车符) 一些是一个示例,比如一个分片包含了如下4条文本记录 ~~~ Rich learning form Intelligent learning engine Learning more convenient From the real demand for more close to the enterprise ~~~ 每条记录表示为以下键值对 ~~~ (0, Rich learning form) (19, Intelligent learning engine) (47, Learning more convenient) (72, From the real demand for more close to the enterprise) ~~~ 很明显,键并不是行号.一般情况下,很难取得行号,因为文件按字节而不是按行切分为分片 # KeyValueTextInputFormat **每一行均为一条记录**,被分割符分割成key, value. 可以通过在驱动类中设置 ~~~ conf.set(keyValueLineRecordReader.KEY_VALUE_SEPERATOR, " "); ~~~ 来设定分割符,默认分割符是tab(\t) 以下是一个示例,输入是一个包含4条记录的分片.其中--->表示一个(水平方向的)制表符 ~~~ line1 ---> Rich learning form line2 ---> Intelligent learning engine line3 ---> Learning more convenient line4 ---> From the real demand for more close to the enterprise ~~~ 每条记录表示为以下键/值对 ~~~ (line1, Rich learning form) (line2, Intelligent learning engine) (line3, Learning more convenient) (line4, From the real demand for more close to the enterprise) ~~~ 此时的键是每行排在制表符之前的Text序列 ## 代码 **map** ~~~ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class KVTextMapper extends Mapper<Text, Text, Text, LongWritable> { Text k = new Text(); LongWritable v = new LongWritable(); @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { //设置key和value k.set(key); //设置key的个数 v.set(1); //写出 context.write(k, v); } } ~~~ **reducer** ~~~ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class KVTextReducer extends Reducer<Text, LongWritable, Text, LongWritable> { LongWritable v = new LongWritable(); @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0l; //汇总统计 for (LongWritable value : values) { count += value.get(); } v.set(count); //输出 context.write(key, v); } } ~~~ **驱动** ~~~ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class MyDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); //设置切割符 conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " "); //获取job对象 Job job = Job.getInstance(conf); //设置jar包关系,关联mapper和reducer job.setJarByClass(MyDriver.class); //告诉框架,我们程序所用的mapper类和reduce类是什么 job.setMapperClass(KVTextMapper.class); job.setReducerClass(KVTextReducer.class); //告诉框架我们程序输出的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //告诉框架,我们程序使用的数据读取组件,结果输出所用的组件是什么 //TextInputFormat是mapreduce程序中内置的一种读取数据组件,准备的叫做读取文本的输入组件 job.setInputFormatClass(KeyValueTextInputFormat.class); //job.setOutputFormatClass(TextOutputFormat.class); //告诉框架,我们要处理的数据文件在那个路径下 FileInputFormat.setInputPaths(job,new Path("/Users/jdxia/Desktop/website/data/input/")); //告诉框架我们的处理结果要输出到什么地方 FileOutputFormat.setOutputPath(job,new Path("/Users/jdxia/Desktop/website/data/output/")); //提交后,然后等待服务器端返回值,看是不是true boolean res = job.waitForCompletion(true); //设置成功就退出码为0 System.exit(res?0:1); } } ~~~ # NLineInputFormat 如果使用NLineInputFormat,代表每个map进程处理的InputSplit不再按block块去划分,而是按NLineInputFormat指定的行数N来划分.即输入文件的总行数/N=切片数,如果不整除,切片数=商+1 以下是一个示例,仍然以上面的4行输入为例子 ~~~ Rich learning form Intelligent learning engine Learning more convenient From the real demand for more close to the enterprise ~~~ 例如,如果N是2,则每个输入分片包含两行,开启2个maptask ~~~ (0, Rich learning form) (19, Intelligent learning engine) ~~~ 另一个mapper则收到后两行 ~~~ (47, Learning more convenient) (72, From the real demand for more close to the enterprise) ~~~ 这里的键和值与TextInputFormat生成的一样 ## 代码 **map** ~~~ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class NLineMapper extends Mapper<LongWritable, Text, Text, LongWritable> { private Text k = new Text(); private LongWritable v = new LongWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //获取一行 String line = value.toString(); //切割 String[] splited = line.split(" "); //循环写出 for (int i = 0; i < splited.length; i++) { k.set(splited[i]); context.write(k, v); } } } ~~~ **reducer** ~~~ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class NLineReducer extends Reducer<Text, LongWritable, Text, LongWritable> { LongWritable v = new LongWritable(); @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count= 0l; //汇总 for (LongWritable value : values) { count += value.get(); } v.set(count); //输出 context.write(key, v); } } ~~~ **驱动** ~~~ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class NLineDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //获取job对象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //设置每个切片InputSplit中划分三条记录 NLineInputFormat.setNumLinesPerSplit(job, 3); //使用NLineInputFormat处理记录 job.setInputFormatClass(NLineInputFormat.class); //设置jar包位置,关联mapper和reducer job.setJarByClass(NLineDriver.class); job.setMapperClass(NLineMapper.class); job.setReducerClass(NLineReducer.class); //设置map输出kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //设置最终输出kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //告诉框架,我们要处理的数据文件在那个路径下 FileInputFormat.setInputPaths(job,new Path("/Users/jdxia/Desktop/website/data/input/")); //告诉框架我们的处理结果要输出到什么地方 FileOutputFormat.setOutputPath(job,new Path("/Users/jdxia/Desktop/website/data/output/")); //这边不用submit,因为一提交就和我这个没关系了,我这就断开了就看不见了 // job.submit(); //提交后,然后等待服务器端返回值,看是不是true boolean res = job.waitForCompletion(true); //设置成功就退出码为0 System.exit(res?0:1); } } ~~~ 输出结果的切片数 ~~~ number of splits ~~~