ThinkChat🤖让你学习和工作更高效,注册即送10W Token,即刻开启你的AI之旅 广告
[TOC] # 需求 对2张表进行合并,并对产品id进行排序 ![](https://box.kancloud.cn/ca40b538d618ee50001cc0b9b4cc28bc_1638x314.png) **maptask** map中处理的事情 1. 获取输入文件类型 2. 获取输入数据 3. 不同文件分别处理 4. 封装bean对象输出 ![](https://box.kancloud.cn/3b969fd60aff3bd0f6e5d7d40d34cc0a_391x233.png) 加个flage标记是那个表 默认对产品id排序 ![](https://box.kancloud.cn/0ca068e7701e036fdb479dce5243649b_400x302.png) **reducetask** reduce方法缓存订单数据集合和产品表,然后合并 ![](https://box.kancloud.cn/7df32b8c8ad4e04d86ac20e0ebe186ec_390x257.png) # 准备数据 order.txt ~~~ 1001 01 1 1002 02 2 1003 03 3 1001 01 1 1002 02 2 1003 03 3 ~~~ pd.txt ~~~ 01 小米 02 华为 03 格力 ~~~ # 代码 ## bean ~~~ import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class TableBean implements Writable { //订单id private String order_id; //产品id private String pid; //产品数量 private int amount; //产品名称 private String pName; //标记是订单表(0)还是产品表(1) private String flag; public TableBean() { super(); } public TableBean(String order_id, String pid, int amount, String pName, String flag) { super(); this.order_id = order_id; this.pid = pid; this.amount = amount; this.pName = pName; this.flag = flag; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(order_id); out.writeUTF(pid); out.writeInt(amount); out.writeUTF(pName); out.writeUTF(flag); } @Override public void readFields(DataInput in) throws IOException { order_id = in.readUTF(); pid = in.readUTF(); amount = in.readInt(); pName = in.readUTF(); flag = in.readUTF(); } //getter/setter/toString } ~~~ ## map阶段 ~~~ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> { Text k = new Text(); TableBean v = new TableBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //区分两张表 FileSplit split = (FileSplit) context.getInputSplit(); String name = split.getPath().getName(); //获取一行 String line = value.toString(); if (name.startsWith("order")) { //订单表 String[] fields = line.split(" "); v.setOrder_id(fields[0]); v.setPid(fields[1]); v.setAmount(Integer.parseInt(fields[2])); v.setpName(""); v.setFlag("0"); k.set(fields[1]); } else { //产品表 String[] fields = line.split(" "); v.setOrder_id(""); v.setPid(fields[0]); v.setAmount(0); v.setpName(fields[1]); v.setFlag("1"); k.set(fields[0]); } context.write(k, v); } } ~~~ ## reduce阶段 ~~~ import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.ArrayList; public class TableReucer extends Reducer<Text, TableBean, TableBean, NullWritable> { @Override protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException { //准备存储订单的集合 ArrayList<TableBean> orderBeans = new ArrayList<>(); //准备bean对象 TableBean pdBean = new TableBean(); //把订单表放到集合中,产品表放到bean中 for (TableBean bean : values) { if ("0".equals(bean.getFlag())) { //订单表 //拷贝传递过来的每条订单数据到集合中 TableBean orderBean = new TableBean(); try { BeanUtils.copyProperties(orderBean, bean); } catch (Exception e) { e.printStackTrace(); } //如果不放拷贝的在这边的话,放bean的话,这边会一直都是最后一个值 orderBeans.add(orderBean); } else { //产品表 try { //拷贝传递过来的产品表到bean中 BeanUtils.copyProperties(pdBean, bean); } catch (Exception e) { e.printStackTrace(); } } } //拼接表,循环链表,拼接数据 for (TableBean tableBean : orderBeans) { tableBean.setpName(pdBean.getpName()); context.write(tableBean, NullWritable.get()); } } } ~~~ ## 驱动类 ~~~ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class TableDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(TableDriver.class); job.setMapperClass(TableMapper.class); job.setReducerClass(TableReucer.class); //告诉框架,我们程序输出的数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TableBean.class); job.setOutputKeyClass(TableBean.class); job.setOutputValueClass(NullWritable.class); //设置输入文件和输出路径 FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/data/input")); FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/data/output")); job.waitForCompletion(true); } } ~~~ # 缺点 合并的操作是在reduce阶段完成的,reduce端处理压力太大,map节点的运算负载则很低,资源利用率不高,而且在reduce阶段容易产生数据倾斜