💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
[TOC] # MapReduce重要配置参数 ## 资源相关参数 **以下参数是在用户自己的mr应用程序中配置就可以生效** 1. **mapreduce.map.memory.mb**: 一个Map Task可使用的资源上限(单位:MB),默认为1024。如果Map Task实际使用的资源量超过该值,则会被强制杀死。 2. **mapreduce.reduce.memory.mb**: 一个Reduce Task可使用的资源上限(单位:MB),默认为1024。如果Reduce Task实际使用的资源量超过该值,则会被强制杀死。 3. **mapreduce.map.cpu.vcores**: 每个Map task可使用的最多cpu core数目, 默认值: 1 4. **mapreduce.reduce.cpu.vcores**: 每个Reduce task可使用的最多cpu core数目, 默认值: 1 5. `mapreduce.map.java.opts`: Map Task的JVM参数,你可以在此配置默认的java heap size等参数, e.g. `“-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc” `(@taskid@会被Hadoop框架自动换为相应的taskid), 默认值: “” 6. `mapreduce.reduce.java.opts`: Reduce Task的JVM参数,你可以在此配置默认的java heap size等参数, e.g. `“-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc”`, 默认值: “” **应该在yarn启动之前就配置在服务器的配置文件中才能生效** 7. `yarn.scheduler.minimum-allocation-mb` 1024 给应用程序container分配的最小内存 8. `yarn.scheduler.maximum-allocation-mb` 8192 给应用程序container分配的最大内存 9. `yarn.scheduler.minimum-allocation-vcores` 1 10. `yarn.scheduler.maximum-allocation-vcores` 32 11. `yarn.nodemanager.resource.memory-mb` 8192 **shuffle性能优化的关键参数,应在yarn启动之前就配置好** 12. mapreduce.task.io.sort.mb 100 ` //shuffle的环形缓冲区大小,默认100m` 14. mapreduce.map.sort.spill.percent 0.8 `//环形缓冲区溢出的阈值,默认80%` ## 容错相关参数 1. `mapreduce.map.maxattempts`: 每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。 2. `mapreduce.reduce.maxattempts`: 每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。 3. `mapreduce.map.failures.maxpercent`: 当失败的Map Task失败比例超过该值为,整个作业则失败,默认值为0. 如果你的应用程序允许丢弃部分输入数据,则该该值设为一个大于0的值,比如5,表示如果有低于5%的Map Task失败(如果一个Map Task重试次数超过mapreduce.map.maxattempts,则认为这个Map Task失败,其对应的输入数据将不会产生任何结果),整个作业扔认为成功。 4. `mapreduce.reduce.failures.maxpercent`: 当失败的Reduce Task失败比例超过该值为,整个作业则失败,默认值为0. 5. `mapreduce.task.timeout`: Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该task处于block状态,可能是卡住了,也许永远会卡主,为了防止因为用户程序永远block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是300000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是`“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”` ## 本地运行mapreduce 作业 设置以下几个参数: ~~~ mapreduce.framework.name=local mapreduce.jobtracker.address=local fs.defaultFS=local ~~~ ## 效率和稳定性相关参数 1. mapreduce.map.speculative: 是否为Map Task打开推测执行机制,默认为false 2. mapreduce.reduce.speculative: 是否为Reduce Task打开推测执行机制,默认为false 3. mapreduce.job.user.classpath.first & mapreduce.task.classpath.user.precedence:当同一个class同时出现在用户jar包和hadoop jar中时,优先使用哪个jar包中的class,默认为false,表示优先使用hadoop jar中的class。 4. mapreduce.input.fileinputformat.split.minsize: FileInputFormat做切片时的最小切片大小, 5. mapreduce.input.fileinputformat.split.maxsize: FileInputFormat做切片时的最大切片大小 (切片的默认大小就等于blocksize,即 134217728) # 全局计数器 在实际生产代码中,常常需要将数据处理过程中遇到的不合规数据行进行全局计数,类似这种需求可以借助mapreduce框架中提供的全局计数器来实现 ![](https://box.kancloud.cn/785f99ed03a9fa67c23ae1e8645a1d07_1051x899.png) 这个统计是全局的 # 多job串联 一个稍复杂点的处理逻辑往往需要多个mapreduce程序串联处理,多job的串联可以借助mapreduce框架的JobControl实现 1. 我们可以用shell脚本,根据状态返回,来决定下一步的shell执行还是不执行 2. 可以设置多个job他们的依赖关系 ~~~ ControlledJob cJob1 = new ControlledJob(job1.getConfiguration()); ControlledJob cJob2 = new ControlledJob(job2.getConfiguration()); ControlledJob cJob3 = new ControlledJob(job3.getConfiguration()); cJob1.setJob(job1); cJob2.setJob(job2); cJob3.setJob(job3); // 设置作业依赖关系,job2执行依赖job1,job3依赖job2 cJob2.addDependingJob(cJob1); cJob3.addDependingJob(cJob2); //设置JobControl,里面放一个组名 JobControl jobControl = new JobControl("RecommendationJob"); jobControl.addJob(cJob1); jobControl.addJob(cJob2); jobControl.addJob(cJob3); // 新建一个线程来运行已加入JobControl中的作业,开始进程并等待结束 Thread jobControlThread = new Thread(jobControl); jobControlThread.start(); //判断是不是已经finish了,没有finish就继续执行 while (!jobControl.allFinished()) { Thread.sleep(500); } jobControl.stop(); return 0; ~~~ # 数据压缩 ## 概述 这是mapreduce的一种优化策略:通过压缩编码对mapper或者reducer的输出进行压缩,以减少磁盘IO,提高MR程序运行速度(但相应增加了cpu运算负担) 1. Mapreduce支持将map输出的结果或者reduce输出的结果进行压缩,以减少网络IO或最终输出数据的体积 2. 压缩特性运用得当能提高性能,但运用不当也可能降低性能 3. 基本原则: 运算密集型的job,少用压缩 IO密集型的job,多用压缩 ## MR支持的压缩编码 ![](https://box.kancloud.cn/438a3c2e9ecc41a083c8bbcfc05ffa66_657x202.png) ## Reducer输出压缩 在配置参数或在代码中都可以设置reduce的输出压缩 1. 在配置参数中设置 ~~~ mapreduce.output.fileoutputformat.compress=false mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodec mapreduce.output.fileoutputformat.compress.type=RECORD ~~~ 2. 在代码中设置 ~~~ Job job = Job.getInstance(conf); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, (Class<? extends CompressionCodec>) Class.forName("")); ~~~ ## Mapper输出压缩 在配置参数或在代码中都可以设置reduce的输出压缩 1. 在配置参数中设置 ~~~ mapreduce.map.output.compress=false mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.DefaultCodec ~~~ 2. 在代码中设置: ~~~ conf.setBoolean(Job.MAP_OUTPUT_COMPRESS, true); conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class, CompressionCodec.class); ~~~ ## 压缩文件的读取(源码) Hadoop自带的InputFormat类内置支持压缩文件的读取,比如TextInputformat类,在其initialize方法中: ~~~ public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); // open the file and seek to the start of the split final FileSystem fs = file.getFileSystem(job); fileIn = fs.open(file); //根据文件后缀名创建相应压缩编码的codec CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); if (null!=codec) { isCompressedInput = true; decompressor = CodecPool.getDecompressor(codec); //判断是否属于可切片压缩编码类型 if (codec instanceof SplittableCompressionCodec) { final SplitCompressionInputStream cIn = ((SplittableCompressionCodec)codec).createInputStream( fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK); //如果是可切片压缩编码,则创建一个CompressedSplitLineReader读取压缩数据 in = new CompressedSplitLineReader(cIn, job, this.recordDelimiterBytes); start = cIn.getAdjustedStart(); end = cIn.getAdjustedEnd(); filePosition = cIn; } else { //如果是不可切片压缩编码,则创建一个SplitLineReader读取压缩数据,并将文件输入流转换成解压数据流传递给普通SplitLineReader读取 in = new SplitLineReader(codec.createInputStream(fileIn, decompressor), job, this.recordDelimiterBytes); filePosition = fileIn; } } else { fileIn.seek(start); //如果不是压缩文件,则创建普通SplitLineReader读取数据 in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes); filePosition = fileIn; } ~~~