💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
[TOC] # 需求 ![](https://box.kancloud.cn/63051cc07a8c0f47375bb7ebc45408c6_902x636.png) 假如数据量巨大,两表的数据是以文件的形式存储在HDFS中,需要用mapreduce程序来实现一下SQL查询运算: ~~~ select a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id ~~~ 这个商品信息表,是存一些商品信息的,数据量没有多大,可以放在缓存中,他有个分布式缓存 然后在每个map启动的时候,能把数据获取过去 ![](https://box.kancloud.cn/26f61fc0088454fa3006de9bbf6ff641_2078x1048.png) # 代码 `job.addCacheFile`这个就是添加缓存的 然后我们看下map这个类 ![](https://box.kancloud.cn/83acfa7550500dd8c3d115154b22eab9_1554x576.png) 他在运行的时候,先执行setup方法,所以我们用setup方法把数据先拉取到自己的map中 然后这个任务,map就完成了,就不需要reduce了 job.setNumReduceTasks(0); ~~~ package cn.itcast.mapreduce.CacheFile; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.HashMap; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MapJoinDistributedCacheFile { private static final Log log = LogFactory.getLog(MapJoinDistributedCacheFile.class); public static class MapJoinDistributedCacheFileMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ FileReader in = null; BufferedReader reader = null; HashMap<String,String[]> b_tab = new HashMap<String, String[]>(); @Override protected void setup(Context context)throws IOException, InterruptedException { // 此处加载的是产品表的数据 in = new FileReader("pdts.txt"); reader = new BufferedReader(in); String line =null; while(StringUtils.isNotBlank((line=reader.readLine()))){ String[] split = line.split(","); String[] products = {split[0],split[1]}; b_tab.put(split[0], products); } IOUtils.closeStream(reader); IOUtils.closeStream(in); } @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String line = value.toString(); String[] orderFields = line.split(","); String pdt_id = orderFields[1]; String[] pdtFields = b_tab.get(pdt_id); String ll = orderFields[0] + "\t" + pdtFields[1] + "\t" + orderFields[1] + "\t" + orderFields[2] ; context.write(new Text(ll), NullWritable.get()); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MapJoinDistributedCacheFile.class); job.setMapperClass(MapJoinDistributedCacheFileMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("D:/mapjoin/input")); FileOutputFormat.setOutputPath(job, new Path("D:/mapjoin/output")); job.setNumReduceTasks(0); job.addCacheFile(new URI("file:/D:/pdts.txt")); // job.addCacheFile(new URI("hdfs://mini1:9000/cachefile/pdts.txt")); job.waitForCompletion(true); } } ~~~