您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Modeler   Code  
会员   
 
   
 
 
     
   
 订阅
  捐助
Hadoop二次开发必懂
 
作者:真实的归宿 来源:CSDN 发布于:2014-12-22
   次浏览      
 

MapReduce概论

大家都熟悉文件系统,在对HDFS进行分析前,我们并没有花很多的时间去介绍HDFS的背景,毕竟大家对文件系统的还是有一定的理解的,而且也有很好的文档。在分析Hadoop的MapReduce部分前,我们还是先了解系统是如何工作的,然后再进入我们的分析部分。下面的图来是我看到的讲MapReduce最好的图。

以Hadoop带的wordcount为例子(下面是启动行):

hadoop jar hadoop-0.19.0-examples.jar wordcount /usr/input /usr/output

用户提交一个任务以后,该任务由JobTracker协调,先执行Map阶段(图中M1,M2和M3),然后执行Reduce阶段(图中R1和R2)。Map阶段和Reduce阶段动作都受TaskTracker监控,并运行在独立于TaskTracker的Java虚拟机中。

我们的输入和输出都是HDFS上的目录(如上图所示)。输入由InputFormat接口描述,它的实现如ASCII文件,JDBC数据库等,分别处理对于的数据源,并提供了数据的一些特征。通过InputFormat实现,可以获取InputSplit接口的实现,这个实现用于对数据进行划分(图中的splite1到splite5,就是划分以后的结果),同时从InputFormat也可以获取RecordReader接口的实现,并从输入中生成<k,v>对。有了<k,v>,就可以开始做map操作了。

map操作通过context.collect(最终通过OutputCollector. collect)将结果写到context中。当Mapper的输出被收集后,它们会被Partitioner类以指定的方式区分地写出到输出文件里。我们可以为Mapper提供Combiner,在Mapper输出它的<k,v>时,键值对不会被马上写到输出里,他们会被收集在list里(一个key值一个list),当写入一定数量的键值对时,这部分缓冲会被Combiner中进行合并,然后再输出到Partitioner中(图中M1的黄颜色部分对应着Combiner和Partitioner)。

Map的动作做完以后,进入Reduce阶段。这个阶段分3个步骤:混洗(Shuffle),排序(sort)和reduce。

混洗阶段,Hadoop的MapReduce框架会根据Map结果中的key,将相关的结果传输到某一个Reducer上(多个Mapper产生的同一个key的中间结果分布在不同的机器上,这一步结束后,他们传输都到了处理这个key的Reducer的机器上)。这个步骤中的文件传输使用了HTTP协议。

排序和混洗是一块进行的,这个阶段将来自不同Mapper具有相同key值的<key,value>对合并到一起。Reduce阶段,上面通过Shuffle和sort后得到的<key, (list of values)>会送到Reducer. reduce方法中处理,输出的结果通过OutputFormat,输出到DFS中。

MapTask

接下来我们来分析Task的两个子类,MapTask和ReduceTask。MapTask的相关类图如下:

MapTask其实不是很复杂,复杂的是支持MapTask工作的一些辅助类。MapTask的成员变量少,只有split和splitClass。我们知道,Map的输入是split,是原始数据的一个切分,这个切分由org.apache.hadoop.mapred.InputSplit的子类具体描述(前面我们是通过org.apache.hadoop.mapreduce.InputSplit介绍了InputSplit,它们对外的API是一样的)。splitClass是InputSplit子类的类名,通过它,我们可以利用Java的反射机制,创建出InputSplit子类。而split是一个BytesWritable,它是InputSplit子类串行化以后的结果,再通过InputSplit子类的readFields方法,我们可以回复出对应的InputSplit对象。

MapTask最重要的方法是run。run方法相当简单,配置完系统的TaskReporter后,就根据情况执行runJobCleanupTask,runJobSetupTask,runTaskCleanupTask或执行Mapper。由于MapReduce现在有两套API,MapTask需要支持这两套API,使得MapTask执行Mapper分为runNewMapper和runOldMapper,run*Mapper后,MapTask会调用父类的done方法。

接下来我们来分析runOldMapper,最开始部分是构造Mapper处理的InputSplit,更新Task的配置,然后就开始创建Mapper的RecordReader,rawIn是原始输入,然后分正常(使用TrackedRecordReader,后面讨论)和跳过部分记录(使用SkippingRecordReader,后面讨论)两种情况,构造对应的真正输入in。

跳过部分记录是Map的一种出错恢复策略,我们知道,MapReduce处理的数据集合非常大,而有些任务对一部分出错的数据不进行处理,对结果的影响很小(如大数据集合的一些统计量),那么,一小部分的数据出错导致已处理的大量结果无效,是得不偿失的,跳过这部分记录,成了Mapper的一种选择。

Mapper的输出,是通过MapOutputCollector进行的,也分两种情况,如果没有Reducer,那么,用DirectMapOutputCollector(后面讨论),否则,用MapOutputBuffer(后面讨论)。

构造完Mapper的输入输出,通过构造配置文件中配置的MapRunnable,就可以执行Mapper了。目前系统有两个MapRunnable:MapRunner和MultithreadedMapRunner,如下图。

原有API在这块的处理上和新API有很大的不一样。接口MapRunnable是原有API中Mapper的执行器,run方法就是用于执行用户的Mapper。MapRunner是单线程执行器,相当简单,首先,当MapTask调用:

MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner 
=ReflectionUtils.newInstance(job.getMapRunnerClass(), job);

MapRunner的configure会在newInstance的最后被调用,configure执行的过程中,对应的Mapper会通过反射机制构造出来。

MapRunner的run方法,会先创建对应的key,value对象,然后,对InputSplit的每一对<key,value>,调用Mapper的map方法,循环结束后,Mapper对应的清理方法会被调用。我们需要注意,key,value对象在run方法中是被重复使用的,就是说,每次传入Mapper的map方法的key,value都是同一个对象,只不过是里面的内容变了,对象并没有变。如果你需要保留key,value的内容,需要实现clone机制,克隆出对象的一个新备份。

相对于新API的多线程执行器,老API的MultithreadedMapRunner就比较复杂了,总体来说,就是通过阻塞队列配合Java的多线程执行器,将<key,value>分发到多个线程中去处理。需要注意的是,在这个过程中,这些线程共享一个Mapper实例,如果Mapper有共享的资源,需要有一定的保护机制。runNewMapper用于执行新版本的Mapper,比runOldMapper稍微复杂,我们就不再讨论了。

辅助类1:

MapTask的辅助类主要针对Mapper的输入和输出。首先我们来看MapTask中用的的Mapper输入,在类图中,这部分位于右上角。

MapTask.TrackedRecordReader是一个Wrapper,在原有输入RecordReader的基础上,添加了收集上报统计数据的功能。

MapTask.SkippingRecordReader也是一个Wrapper,它在MapTask.TrackedRecordReader的基础上,添加了忽略部分输入的功能。在分析MapTask.SkippingRecordReader之前,我们先看一下类SortedRanges和它相关的类。

类SortedRanges.Ranges表示了一个范围,以开始位置和范围长度(这样的话就可以表示长度为0的范围)来表示一个范围,并提供了一系列的范围操作方法。注意,方法getEndIndex得到的右端点并不包含在范围内(应理解为开区间)。SortedRanges包含了一系列不重叠的范围,为了保证包含的范围不重叠,在add方法和remove方法上需要做一些处理,保证不重叠的约束。SkipRangeIterator是访问SortedRanges包含的Ranges的迭代器。

MapTask.SkippingRecordReader的实现很简单,因为要忽略的输入都保持在SortedRanges.Ranges,只需要在next方法中,判断目前范围时候落在SortedRanges.Ranges中,如果是,忽略,并将忽略的记录写文件(可配置)

NewTrackingRecordReader和NewOutputCollector被新API使用,我们不分析。

MapTask的输出辅助类都继承自MapOutputCollector,它只是在OutputCollector的基础上添加了close和flush方法。

DirectMapOutputCollector用在Reducer的数目为0,就是不需要Reduce阶段的时候。它是直接通过out = job.getOutputFormat().getRecordWriter(fs,job, finalName, reporter);得到对应的RecordWriter,collect直接到RecordWriter上。

如果Mapper后续有reduce任务,系统会使用MapOutputBuffer做为输出,这是个比较复杂的类,有1k行左右的代码。

我们知道,Mapper是通过OutputCollector将Map的结果输出,输出的量很大,Hadoop的机制是通过一个circle buffer 收集Mapper的输出, 到了io.sort.mb * percent量的时候,就spill到disk,如下图。图中出现了两个数组和一个缓冲区,kvindices保持了记录所属的(Reduce)分区,key在缓冲区开始的位置和value在缓冲区开始的位置,通过kvindices,我们可以在缓冲区中找到对应的记录。kvoffets用于在缓冲区满的时候对kvindices的partition进行排序,排完序的结果将输出到输出到本地磁盘上,其中索引(kvindices)保持在spill{spill号}.out.index中,数据保存在spill{spill号}.out中。

当Mapper任务结束后,有可能会出现多个spill文件,这些文件会做一个归并排序,形成Mapper的一个输出(spill.out和spill.out.index),如下图:

这个输出是按partition排序的,这样的话,Mapper的输出被分段,Reducer要获取的就是spill.out中的一段。(注意,内存和硬盘上的索引结构不一样)

辅助类2:

有了上面Mapper输出的内存存储结构和硬盘存储结构讨论,我们来仔细分析MapOutputBuffer的流程。

首先是成员变量。最先初始化的是作业配置job和统计功能reporter。通过配置,MapOutputBuffer可以获取本地文件系统(localFs和rfs),Reducer的数目和Partitioner。

SpillRecord是文件spill.out{spill号}.index在内存中的对应抽象(内存数据和文件数据就差最后的校验和),该文件保持了一系列的IndexRecord,如下图:

IndexRecord有3个字段,分别是startOffset:记录偏移量,rawLength:初始长度,partLength:实际长度(可能有压缩)。SpillRecord保持了一系列的IndexRecord,并提供方法用于添加记录(没有删除记录的操作,因为不需要),获取记录,写文件,读文件(通过构造函数)。

接下来是一些和输出缓存区kvbuffer,缓存区记录索引kvindices和缓存区记录索引排序工作数组kvoffsets相关的处理,下面的图有助于说明这段代码。

这部分依赖于3个配置参数,io.sort.spill.percent是kvbuffer,kvindices和kvoffsets的总大小(以M为单位,缺省是100,就是100M,这一部分是MapOutputBuffer中占用存储最多的)。io.sort.record.percent是kvindices和kvoffsets占用的空间比例(缺省是0.05)。前面的分析我们已经知道kvindices和kvoffsets,如果记录数是N的话,它占用的空间是4N*4bytes,根据这个关系和io.sort.record.percent的值,我们可以计算出kvindices和kvoffsets最多能有多少个记录,并分配相应的空间。参数io.sort.spill.percent指示当输出缓冲区或kvindices和kvoffsets记录数量到达对应的占用率的时候,会启动spill,将内存缓冲区的记录存放到硬盘上,softBufferLimit和softRecordLimit为对应的字节数。

值对<key, value>输出到缓冲区是通过Serializer串行化的,这部分的初始化跟在上面输出缓存后面。接下来是一些计数器和可能的数据压缩处理器的初始化,可能的Combiner和combiner工作的一些配置。

最后是启动spillThread,该Thread会检查内存中的输出缓存区,在满足一定条件的时候将缓冲区中的内容spill到硬盘上。这是一个标准的生产者-消费者模型,MapTask的collect方法是生产者,spillThread是消费者,它们之间同步是通过spillLock(ReentrantLock)和spillLock上的两个条件变量(spillDone和spillReady)完成的。

先看生产者,MapOutputBuffer.collect的主要流程是:

1.报告进度和参数检测(<K, V>符合Mapper的输出约定);

2.spillLock.lock(),进入临界区;

3.如果达到spill条件,设置变量并通过spillReady.signal(),通知spillThread;并等待spill结束(通过spillDone.await()等待);

4.spillLock.unlock();

5.输出key,value并更新kvindices和kvoffsets(注意,方法collect是synchronized,key和value各自输出,它们也会占用连续的输出缓冲区);

kvstart,kvend和kvindex三个变量在判断是否需要spill和spill是否结束的过程中很重要,kvstart是有效记录开始的下标,kvindex是下一个可做记录的位置,kvend的作用比较特殊,它在一般情况下kvstart==kvend,但开始spill的时候它会被赋值为kvindex的值,spill结束时,它的值会被赋给kvstart,这时候kvstart==kvend。这就是说,如果kvstart不等于kvend,系统正在spill,否则,kvstart==kvend,系统处于普通工作状态。其实在代码中,我们可以看到很多kvstart==kvend的判断。

下面我们分情况,讨论kvstart,kvend和kvindex的配合。初始化的时候,它们都被赋值0。

下图给出了一个没有spill的记录添加过程:

注意kvindex和kvnext的关系,取模实现了循环缓冲区

如果在添加记录的过程中,出现spill(多种条件),那么,主要的过程如下:

首先还是计算kvnext,主要,这个时候kvend==kvstart(图中没有画出来)。如果spill条件满足,那么,kvindex的值会赋给kvend(这是kvend不等于kvstart),从kvstart和kvend的大小关系,我们可以知道记录位于数组的那一部分(左边是kvstart<kvend的情况,右边是另外的情况)。Spill结束的时候,kvend值会被赋给kvstart, kvend==kvstart又重新满足,同时,我们可以发现kvindex在这个过程中没有变化,新的记录还是写在kvindex指向的位置,然后,kvindex=kvnect,kvindex移到下一个可用位置。

大家体会一下上面的过程,特别是kvstart,kvend和kvindex的配合,其实,<key,value>对输出使用的缓冲区,也有类似的过程。

Collect在处理<key,value>输出时,会处理一个MapBufferTooSmallException,这是value的串行化结果太大,不能一次放入缓冲区的指示,这种情况下我们需要调用spillSingleRecord,特殊处理。

辅助类3:

接下来讨论的是key,value的输出,这部分比较复杂,不过有了前面kvstart,kvend和kvindex配合的分析,有利于我们理解这部分的代码。输出缓冲区中,和kvstart,kvend和kvindex对应的是bufstart,bufend和bufmark。这部分还涉及到变量bufvoid,用于表明实际使用的缓冲区结尾(见后面BlockingBuffer.reset分析),和变量bufmark,用于标记记录的结尾。这部分代码需要bufmark,是因为key或value的输出是变长的,(前面元信息记录大小是常量,就不需要这样的变量)。最好的情况是缓冲区没有翻转和value串行化结果很小,如下图:

先对key串行化,然后对value做串行化,临时变量keystart,valstart和valend分别记录了key结果的开始位置,value结果的开始位置和value结果的结束位置。

串行化过程中,往缓冲区写是最终调用了Buffer.write方法,我们后面再分析。

如果key串行化后出现bufindex < keystart,那么会调用BlockingBuffer的reset方法。原因是在spill的过程中需要对<key,value>排序,这种情况下,传递给RawComparator的必须是连续的二进制缓冲区,通过BlockingBuffer.reset方法,解决这个问题。下图解释了如何解决这个问题:

当发现key的串行化结果出现不连续的情况时,我们会把bufvoid设置为bufmark,见缓冲区开始部分往后挪,然后将原来位于bufmark到bufvoid出的结果,拷到缓冲区开始处,这样的话,key串行化的结果就连续存放在缓冲区的最开始处。

上面的调整有一个条件,就是bufstart前面的缓冲区能够放下整个key串行化的结果,如果不能,处理的方式是将bufindex置0,然后调用BlockingBuffer内部的out的write方法直接输出,这实际调用了Buffer.write方法,会启动spill过程,最终我们会成功写入key串行化的结果。

下面我们看write方法。key,value串行化过程中,往缓冲区写数据是最终调用了Buffer.write方法,又是一个复杂的方法。

do-while循环,直到我们有足够的空间可以写数据(包括缓冲区和kvindices和kvoffsets)

首先我们计算缓冲区连续写是否写满标志buffull和缓冲区非连续情况下有足够写空间标志wrap(这个实在拗口),见下面的讨论;条件(buffull && !wrap)用于判断目前有没有足够的写空间;

在spill没启动的情况下(kvstart == kvend),分两种情况,如果数组中有记录(kvend != kvindex),那么,根据需要(目前输出空间不足或记录数达到spill条件)启动spill过程;否则,如果空间还是不够(buffull && !wrap),表明这个记录非常大,以至于我们的内存缓冲区不能容下这么大的数据量,抛MapBufferTooSmallException异常;

如果空间不足同时spill在运行,等待spillDone;

写数据,注意,如果buffull,则写数据会不连续,则写满剩余缓冲区,然后设置bufindex=0,并从bufindex处接着写。否则,就是从bufindex处开始写。

下图给出了缓冲区连续写是否写满标志buffull和缓冲区非连续情况下有足够写空间标志wrap计算的几种可能:

情况1和情况2中,buffull判断为从bufindex到bufvoid是否有足够的空间容纳写的内容,wrap是图中白颜色部分的空间是否比输入大,如果是,wrap为true;情况3和情况4中,buffull判断bufindex到bufstart的空间是否满足条件,而wrap肯定是false。明显,条件(buffull && !wrap)满足时,目前的空间不够一次写。

接下来我们来看spillSingleRecord,只是用于写放不进内存缓冲区的<key,value>对。过程很流水,首先是创建SpillRecord记录,输出文件和IndexRecord记录,然后循环,构造SpillRecord并在恰当的时候输出记录(如下图),最后输出spill{n}.index文件。

前面我们提过spillThread,在这个系统中它是消费者,这个消费者相当简单,需要spill时调用函数sortAndSpill,进行spill。sortAndSpill和spillSingleRecord类似,函数的开始也是创建SpillRecord记录,输出文件和IndexRecord记录,然后,需要在kvoffsets上做排序,排完序后顺序访问kvoffsets,也就是按partition顺序访问记录。

按partition循环处理排完序的数组,如果没有combiner,则直接输出记录,否则,调用combineAndSpill,先做combin然后输出。循环的最后记录IndexRecord到SpillRecord。

sortAndSpill最后是输出spill{n}.index文件。

combineAndSpill比价简单,我们就不分析了。

BlockingBuffer中最后要分析的方法是flush方法。调用flush方法,意味着Mapper的结果都已经collect了,需要对缓冲区做一些最后的清理,并合并spill{n}文件产生最后的输出。

缓冲区处理部分很简单,先等待可能的spill过程完成,然后判断缓冲区是否为空,如果不是,则调用sortAndSpill,做最后的spill,然后结束spill线程。

flush合并spill{n}文件是通过mergeParts方法。如果Mapper最后只有一个spill{n}文件,简单修改该文件的文件名就可以。如果Mapper没有任何输出,那么我们需要创建哑输出(dummy files)。如果spill{n}文件多于1个,那么按partition循环处理所有文件,将处于处理partition的记录输出。处理partition的过程中可能还会再次调用combineAndSpill,最记录再做一次combination,其中还涉及到工具类Merger,我们就不再深入研究了。

从前面的图中,我们可以发现Task有很多内部类,并拥有大量类成员变量,这些类配合Task完成相关的工作,如下图。

MapOutputFile管理着Mapper的输出文件,它提供了一系列get方法,用于获取Mapper需要的各种文件,这些文件都存放在一个目录下面。我们假设传入MapOutputFile的JobID为job_200707121733_0003,TaskID为task_200707121733_0003_m_000005。MapOutputFile的根为{mapred.local.dir}/taskTracker/jobcache/{jobid}/{taskid}/output在下面的讨论中,我们把上面的路径记为{MapOutputFileRoot}

以上面JogID和TaskID为例,我们有:{mapred.local.dir}/taskTracker/jobcache/job_200707121733_0003/task_200707121733_0003_m_000005/output需要注意的是,{mapred.local.dir}可以包含一系列的路径,那么,Hadoop会在这些根路径下找一个满足要求的目录,建立所需的文件。MapOutputFile的方法有两种,结尾带ForWrite和不带ForWrite,带ForWrite用于创建文件,它需要一个文件大小作为参数,用于检查磁盘空间。不带ForWrite用于获取以建立的文件。

getOutputFile:文件名为{MapOutputFileRoot}/file.out; getOutputIndexFile:文件名为{MapOutputFileRoot}/file.out.indexgetSpillFile:文件名为{MapOutputFileRoot}/spill{spillNumber}.outgetSpillIndexFile:文件名为{MapOutputFileRoot}/spill{spillNumber}.out.index以上四个方法用于Task子类MapTask中;getInputFile:文件名为{MapOutputFileRoot}/map_{mapId}.out用于ReduceTask中。我们到使用到他们的地方再介绍相应的应用场景。

介绍完临时文件管理以后,我们来看Task.CombineOutputCollector,它继承自org.apache.hadoop.mapred.OutputCollector,很简单,只是一个OutputCollector到IFile.Writer的Adapter,活都让IFile.Writer干了。 ValuesIterator用于从RawKeyValueIterator(Key,Value都是DataInputBuffer,ValuesIterator要求该输入已经排序)中获取符合RawComparator<KEY> comparator的值的迭代器。它在Task中有一个简单子类,CombineValuesIterator。

Task.TaskReporter用于向JobTracker提交计数器报告和状态报告,它实现了计数器报告Reporter和状态报告StatusReporter。为了不影响主线程的工作,TaskReporter有一个独立的线程,该线程通过TaskUmbilicalProtocol接口,利用Hadoop的RPC机制,向JobTracker报告Task执行情况。

FileSystemStatisticUpdater用于记录对文件系统的对/写操作字节数,是个简单的工具类。

   
次浏览       
相关文章

基于EA的数据库建模
数据流建模(EA指南)
“数据湖”:概念、特征、架构与案例
在线商城数据库系统设计 思路+效果
 
相关文档

Greenplum数据库基础培训
MySQL5.1性能优化方案
某电商数据中台架构实践
MySQL高扩展架构设计
相关课程

数据治理、数据架构及数据标准
MongoDB实战课程
并发、大容量、高性能数据库设计与优化
PostgreSQL数据库实战培训
最新活动计划
LLM大模型应用与项目构建 12-26[特惠]
QT应用开发 11-21[线上]
C++高级编程 11-27[北京]
业务建模&领域驱动设计 11-15[北京]
用户研究与用户建模 11-21[北京]
SysML和EA进行系统设计建模 11-28[北京]

MySQL索引背后的数据结构
MySQL性能调优与架构设计
SQL Server数据库备份与恢复
让数据库飞起来 10大DB2优化
oracle的临时表空间写满磁盘
数据库的跨平台设计
更多...   


并发、大容量、高性能数据库
高级数据库架构设计师
Hadoop原理与实践
Oracle 数据仓库
数据仓库和数据挖掘
Oracle数据库开发与管理


GE 区块链技术与实现培训
航天科工某子公司 Nodejs高级应用开发
中盛益华 卓越管理者必须具备的五项能力
某信息技术公司 Python培训
某博彩IT系统厂商 易用性测试与评估
中国邮储银行 测试成熟度模型集成(TMMI)
中物院 产品经理与产品管理
更多...