一、MapReduce2工作机制
1.1、MapReduce2的架构图
1.2、MapReduce2运作步骤
说在前头的话,上图中有一个ResoureceManager,这是一个资源调度器,说白了就是管资源的,在MapReduce1时,所有的事情都是交给Jobtracker来做,包括资源调度,在MapReduce2的框架图当中,进行了明确的分工,减少了各个组件的耦合性,当然优化不止这一点。在hadoop当中的三大组件之一yarn,其实就是一个资源调度框架,并且默认是集成的,只需要稍加配置就好,目前企业对hadoop的使用,基本都在用yarn做资源调度。
客户端的配置信息mapreduce.framework.name为yarn时,客户端会启动YarnRunner(yarn的客户端程序),并将mapreduce作业提交给yarn平台处理。
以下是对MapReduce2框架图的解释:
1.2.1、提交job
Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业(第1步)。新的作业ID(应用ID)由ResourceManager分配(第2步)。作业的client核实作业的输出,计算输入的split,将作业的资源(包括jar包、配置文件和split信息等)拷贝给HDFS(第3步)。最后,通过调用ResourceManager的submitApplication()来提交作业(第4步)。
1.2.2、作业初始化
当ResourceManager收到submitApplication()的请求时, 就将该请求发给调度器(scheduler), 调度器分配container, 然后ResourceManager在该container内启动ApplicationMaster进程, 由NodeManager监控(第5a和5b步)。
MapReduce作业的ApplicationMaster是一个主类为MRAppMaster的Java应用。其通过创造一些bookkeeping对象来监控作业的进度, 得到任务的进度和完成报告(第6步)。MRAppMaster通过分布式文件系统得到由客户端计算好的输入split(第7步)。MRAppMaster为每个输入split创建一个map任务, 根据mapreduce.job.reduces创建reduce任务对象。
1.2.3、任务分配
如果作业很小,ApplicationMaster会选择在其自己的JVM中运行任务。如果不是小作业,那么ApplicationMaster向ResourceManager请求container来运行所有的map和reduce任务(第8步)。这些请求是通过心跳来传输的,包括每个map任务的数据位置,比如存放输入split的主机名和机架(rack)。调度器利用这些信息来调度任务,尽量将任务分配给存储数据的节点,或者退而分配给和存放输入split的节点相同机架的节点。
1.2.4、任务运行
当一个任务由ResourceManager的调度分配给一个container后, ApplicationMaster通过联系NodeManager来启动container(第9a步和9b步)。任务由一个主类为YarnChild的Java应用执行。在运行任务之前首先本地化任务需要的资源,比如作业配置、JAR文件和分布式缓存的所有文件(第10步)。最后,运行map或reduce任务(第11步)。
1.2.5、进度和状态更新
YARN中的任务将其进度和状态(包括counter)返回给ApplicationMaster,客户端每秒轮询(通过mapreduce.client.progressmonitor.pollinterval设置)向ApplicationMaster请求进度更新,展示给用户。
1.2.6、作业完成
除了向ApplicationMaster请求作业进度外,客户端每5分钟都会通过调用waitForCompletion()来检查作业是否完成。时间间隔可以通过mapreduce.client.completion. pollinterval来设置。作业完成之后, ApplicationMaster和container会清理工作状态, OutputCommiter的作业清理方法也会被调用。作业的信息会被作业历史服务器存储以备之后用户核查。最后,ApplicationMaster向ResourceManager注销并关闭自己。
二、mapTask的工作机制
先放出整个mapreduce的流程图:
2.1、切片
在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize)),minSize的默认值是1,而maxSize的默认值是long类型的最大值,即可得切片的默认大小是blockSize(128M),maxSize参数如果调得比blocksize小,则会让切片变小,而且就等于配置的这个参数的值,minSize参数调的比blockSize大,则可以让切片变得比blocksize还大
,hadoop为每个分片构建一个map任务,可以并行处理多个分片上的数据,整个数据的处理过程将得到很好的负载均衡,因为一台性能较强的计算机能处理更多的数据分片。分片也不能切得太小,否则多个map和reduce间数据的传输时间,管理分片,构建多个map任务的时间将决定整个作业的执行时间(导致大部分时间都不在计算上)。如果文件大小小于128M,则该文件不会被切片,不管文件多小都会是一个单独的切片,交给一个maptask处理.如果有大量的小文件,将导致产生大量的maptask,大大降低集群性能。
大量小文件的优化策略:
(1) 在数据处理的前端就将小文件整合成大文件,再上传到hdfs上,即避免了hdfs不适合存储小文件的缺点,又避免了后期使用mapreduce处理大量小文件的问题。(最提倡的做法)
(2)小文件已经存在hdfs上了,可以使用另一种inputformat来做切片(CombineFileInputFormat),它的切片逻辑和FileInputFormat(默认)不同,它可以将多个小文件在逻辑上规划到一个切片上,交给一个maptask处理。
2.2、环形缓存区
经过map函数的逻辑处理后的数据输出之后,会通过OutPutCollector收集器将数据收集到环形缓存区保存。环形缓存区的大小默认为100M,当保存的数据达到80%时,就将缓存区的数据溢出到磁盘上保存。
2.3、溢出
环形缓存区的数据达到其容量的80%时就会溢出到磁盘上进行保存,在此过程中,程序会对数据进行分区(默认HashPartition)和排序(默认根据key进行快排),缓存区不断溢出的数据形成多个小文件。其实溢出的过程当中,还会涉及排序,分区,combiner,这些后面详细介绍。
2.4、合并
溢出的多个小文件各个区合并在一起(0区和0区合并成一个0区),形成大文件,通过归并排序保证区内的数据有序。
2.5、输出到运行mapTask的本地磁盘
形成的文件会存放在运行mapTask的本地磁盘上,所以map的结果并不会持久化到HDFS上去,只有reduceTask的结果会持久化到HDFS上。
三、reduceTask的工作机制
3.1、拉取数据阶段
简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求Map Task获取属于自己的文件。之前在mapTask中,从环形缓冲区溢出进文件时,会进行分区,分区决定了这个分区的数据应该被那个reduceTask处理。
3.2、Merge阶段
这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的 merge。与map端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件
3.3、reduce函数处理阶段
把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。
四、shuffle的工作机制
Shuffle 是 MapReduce 的核心,它分布在 MapReduce 的Map 阶段和Reduce阶段。一般把从 Map产生输出开始到Reduce 取得数据 作为输入之前的过程称作Shuffle。如mapreduce的流程图中的步骤2-》步骤7都属于shuffle阶段,即map任务和reduce任务之间的数据流称为shuffle(混洗),而过程5最能体现出混洗这一概念。一般情况下,一个reduce任务的输入数据来自与多个map任务,多个reduce任务的情况下就会出现如过程5所示的,每个reduce任务从map的输出数据中获取属于自己的那个分区的数据。
(1)Collect 阶段:将 Map Task的结果输出到默认大小为 100M的环形缓冲区,保存的是 key/value,Partition 分区信息等。
(2)Spill 阶段:当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘之前需要对数据进行一次排序的操作,如果配置了Combiner,还会将有相同分区号和 key 的数据进行排序。
(3)Merge 阶段:把所有溢出的临时文件进行一次合并操作,以确保一个Map Task最终只产生一个中间数据文件。
(4)Copy 阶段: Reduce Task 启动 Fetcher 线程到已经完成 Map Task 的节点上复制一份属于自己的数据,这些数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写到磁盘之上。
(5)Merge 阶段:在 Reduce Task 远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作。
(6)Sort 阶段:在对数据进行合并的同时,会进行排序操作,由于 Map Task阶段已经对数据进行了局部的排序,Reduce Task 只需保证 Copy 的数据的最终整体有效性即可。Shuffle 中的缓冲区大小会影响到MapReduce 程序的执行效率,原则上说,缓冲区越大,磁盘 io 的次数越少,执行速度就越快。缓冲区的大小可以通过参数调整, 参数:io.sort.mb 默认 100M。
五、mapreduce代码实现
package com.ibeifeng.mapreduce;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MyWordCountMapReduce extends Configured implements Tool{
//定义map处理类模板
public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private Text outKey = new Text();
private IntWritable outValues = new IntWritable(1);
@Override
public void map(LongWritable key, Text values, Context context)
throws IOException, InterruptedException {
//书写map处理业务逻辑代码
//1.将单词进行分割
String str = values.toString();
// String[] split = str.split(" ");
// for(String s : split) {
// outKey.set(s);
// context.write(outKey, outValues);
// }
StringTokenizer stringtokenizer = new StringTokenizer(str);
while((stringtokenizer.hasMoreTokens())) {
outKey.set(stringtokenizer.nextToken());
context.write(outKey, outValues);
System.out.println("<"+outKey+outValues+">");
}
}
}
//定义combiner处理模块
public static class Combiner extends Reducer<Text, IntWritable, Text, IntWritable>{
private IntWritable outValues = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
//书写reduce处理业务逻辑代码
//1.由于需要累加所以需要定义一个变量做记录
int sum = 0;
for(IntWritable i : values) {
sum+=i.get();
}
outValues.set(sum);
context.write(key, outValues);
System.out.println("<"+key+outValues+">");
}
}
//定义reduce处理类模板
public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
private IntWritable outValues = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
//书写reduce处理业务逻辑代码
//1.由于需要累加所以需要定义一个变量做记录
int sum = 0;
for(IntWritable i : values) {
sum+=i.get();
}
outValues.set(sum);
context.write(key, outValues);
System.out.println("<"+key+outValues+">");
}
}
//配置Driver模块
public int run(String[] args) {
//1.获取配置配置文件对象
Configuration configuration = new Configuration();
//2.创建给mapreduce处理的任务
Job job = null;
try {
job = Job.getInstance(configuration,this.getClass().getSimpleName());
} catch (IOException e) {
e.printStackTrace();
}
try {
//3.创建输入路径
Path source_path = new Path(args[0]);
FileInputFormat.addInputPath(job, source_path);
//4.创建输出路径
Path des_path = new Path(args[1]);
FileOutputFormat.setOutputPath(job, des_path);
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
//设置让任务打包jar运行
job.setJarByClass(MyWordCountMapReduce.class);
//5.设置map
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//================shuffle========================
//1.分区,主要用于自定义分区使用
// job.setPartitionerClass(cls);
//2.排序,主要用于自定义排序使用
// job.setSortComparatorClass(cls);
//3.分组,主要用于自定义分组使用
// job.setGroupingComparatorClass(cls);
//4.可选项,设置combiner,相当于map过程的reduce处理,优化选项,可节省map和reduce之间IO传输量
job.setCombinerClass(Combiner.class);
//================shuffle========================
//6.设置reduce
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//7.提交job到yarn组件上
boolean isSuccess = false;
try {
isSuccess = job.waitForCompletion(true);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return isSuccess?0:1;
}
//书写主函数
public static void main(String[] args) {
Configuration configuration = new Configuration();
//1.书写输入和输出路径
String[] args1 = new String[] {
"hdfs://hadoop-senior01.ibeifeng.com:8020/user/beifeng/wordcount/input",
"hdfs://hadoop-senior01.ibeifeng.com:8020/user/beifeng/wordcount/output"
};
//2.设置系统以什么用户执行job任务
System.setProperty("HADOOP_USER_NAME", "beifeng");
//3.运行job任务
int status = 0;
try {
status = ToolRunner.run(configuration, new MyWordCountMapReduce(), args1);
} catch (Exception e) {
e.printStackTrace();
}
// int status = new MyWordCountMapReduce().run(args1);
//4.退出系统
System.exit(status);
}
}
六、mapreduce几个重要概念理解
6.1、partition分区
数据从环形缓存区溢出到文件的过程中会根据用户自定义的partition函数进行分区,如果用户没有自定义该函数,程序会用默认的partitioner通过哈希函数来分区,hash partition 的好处是比较弹性,跟数据类型无关,实现简单,只需要设置reducetask的个数。分区的目的是将整个大数据块分成多个数据块,通过多个reducetask处理后,输出多个文件。通常在输出数据需要有所区分的情况下使用自定义分区,如在上述的流量统计的案例里,如果需要最后的输出数据再根据手机号码的省份分成几个文件来存储,则需要自定义partition函数,并在驱动程序里设置reduce任务数等于分区数(job.setNumReduceTasks(5);)和指明自己定义的partition(job.setPartitionerClass(ProvincePartitioner.class))。在需要获取统一的输出结果的情况下,不需要自定义partition也不用设置reducetask的数量(默认1个)。
自定义的分区函数有时会导致数据倾斜的问题,即有的分区数据量极大,各个分区数据量不均匀,这会导致整个作业时间取决于处理时间最长的那个reduce,应尽量避免这种情况发生。
6.2、combiner(map端的reduce)
集群的带宽限制了mapreduce作业的数量,因此应该尽量避免map和reduce任务之间的数据传输。hadoop允许用户对map的输出数据进行处理,用户可自定义combiner函数(如同map函数和reduce函数一般),其逻辑一般和reduce函数一样,combiner的输入是map的输出,combiner的输出作为reduce的输入,很多情况下可以直接将reduce函数作为conbiner函数来使用。combiner属于优化方案,所以无法确定combiner函数会调用多少次,可以在环形缓存区溢出文件时调用combiner函数,也可以在溢出的小文件合并成大文件时调用combiner。但要保证不管调用几次combiner函数都不会影响最终的结果,所以不是所有处理逻辑都可以使用combiner组件,有些逻辑如果在使用了combiner函数后会改变最后rerduce的输出结果(如求几个数的平均值,就不能先用combiner求一次各个map输出结果的平均值,再求这些平均值的平均值,这将导致结果错误)。combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量。(原先传给reduce的数据是(a,(1,1,1,1,1,1…)),使用combiner后传给reduce的数据变为(a,(4,2,3,5…)))
6.3、分组
分组和上面提到的partition(分区)不同,分组发生在reduce端,reduce的输入数据,会根据key是否相等而分为一组,如果key相等的,则这些key所对应的value值会作为一个迭代器对象传给reduce函数。以单词统计为例,reduce输入的数据就如:第一组:(a,(1,3,5,3,1))第二组:(b,(6,2,3,1,5))。上述例子也可以看出在map端是执行过combiner函数的,否则reduce获得的输入数据是:第一组:(a,(1,1,1,1,1,…))第二组:(b,(1,1,1,1,1…))。对每一组数据调用一次reduce函数。
6.4、排序
在整个mapreduce过程中涉及到多处对数据的排序,环形缓存区溢出的文件,溢出的小文件合并成大文件,reduce端多个分区数据合并成一个大的分区数据等都需要排序,而这排序规则是根据key的compareTo方法来的。map端输出的数据的顺序不一定是reduce端输入数据的顺序,因为在这两者之间数据经过了排序,但reduce端输出到文件上显示的顺序就是reduce函数的写出顺序。在没有reduce函数的情况下,显示地在驱动函数里将reduce的数量设置为0(设置为0后表示没有reduce阶段,也就没有shuffle阶段,也就不会对数据进行各种排序分组),否则虽然没有reduce逻辑,但是还是会有shuffle阶段,map端处理完数据后将数据保存在文件上的顺序也不是map函数的写出顺序,而是经过shuffle分组排序过后的顺序