写磁盘时压缩map输出往往是个好主意,因为这样会让写磁盘的速度更快,节约磁盘空间,并减少传给reducer的数据量。默认情况下,输出是不压缩的,但只要将mapred.compress.map.output设置为true,就可以轻松启动该功能。使用的压缩库由mapred.map.output.compression.codec指定。
Reducer通过http方式得到输出文件的分区。用于文件分区的工作线程的数量由任务的tracker.http.threads属性控制,此设置针对每个TaskTracker,而不是针对每个map任务槽。默认值是40,在运行大型作业的大型集群上,此值可以根据需要而增加。
6.2.Reduce端
Map输出文件位于运行map任务的TaskTracker的本地磁盘,现在TaskTracker需要为分区文件运行reduce任务。更进一步,reduce任务需要集群上若干个map任务的map输出作为其特殊的分区文件。每个map任务的完成时间可能不同,因此只需要有一个任务完成,reduce任务就开始复制其输出。这就是reduce任务的复制阶段(copy phase)。Reduce任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,但这个默认值可以通过设置mapred.reduce.parallel.copies属性来改变。
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Reducer如何知道要从那个TaskTracker取得map输出呢?
Map任务成功完成后,他们会通知父TaskTracker状态更新,然后TaskTracker进而通知JobTracker,这些通知在前面介绍的心跳通讯机制中传输。因此,对于指定作业,JobTracker知道map输出和TaskTracker之间的映射关系。Reducer中的一个线程定期询问JobTracker以获取map输出的位置,直到他获得所有输出位置。
由于reducer可能失败,因此TaskTracker并没有在第一个reducer检索到map输出时就立刻从磁盘上删除它们。相反,TaskTracker会等待,直到JobTracker告知它们可以删除map输出,这是作业完成后执行的。
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
如果map输出相当小,则会被复制到reduce TaskTracker的内存,否则,map输出被复制到磁盘。一旦内存缓冲区达到阀值大小(由mapred.iob.shuffle.merge.percent决定)或达到map输出阀值(由mapred.inmem.merge.threshold控制),则合并后溢出写到磁盘中。
随着磁盘复本的增多,后台线程会将他们合并为更大的、排好序的文件。mapreduce的工作原理这会为后面的合并节省一些空间。注意,为了合并,压缩的map输出都必须在内存中被解压缩。
复制完所有map输出,reduce任务进入排序阶段,这个阶段将合并map输出,维持其顺序排序。这是循环进行的。比如,如果有50个map输出,而合并因子是10(10为默认设置,由io.sort.factor属性设置,与map的合并类似),合并将进行5趟。每趟将10个文件合并成一个文件,因此最后有5个中间文件。
在最后阶段,即reduce阶段,直接把数据输入reduce函数,从而省略了一次磁盘往返行程,并没有将这5个文件合并成一个已排序的文件作为最后一趟。最后的的合并即可来自内存和磁盘片段。
在reduce阶段,对已排序输出中的每个键都调用reduce函数。该阶段的输出直接写到输出文件系统,一般为HDFS。如果采用HDFS,由于TaskTracker节点也运行数据节点,所以第一个块复本将被写到本地磁盘。
本文来自电脑杂谈,转载请注明本文网址:
http://www.pc-fly.com/a/jisuanjixue/article-37299-14.html
烊烊太萌了