| 我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程。 MapReduce V1实现中,主要存在3个主要的分布式进程(角色):JobClient、JobTracker和TaskTracker,我们主要是以这三个角色的实际处理活动为主线,并结合源码,分析实际处理流程。下图是《Hadoop权威指南》一书给出的MapReduce 
                          V1处理Job的抽象流程图: 
 如上图,我们展开阴影部分的处理逻辑,详细分析Job提交在JobClient端的具体流程。 在编写好MapReduce程序以后,需要将Job提交给JobTracker,那么我们就需要了解在提交Job的过程中,在JobClient端都做了哪些工作,或者说执行了哪些处理。在JobClient端提交Job的处理流程,如下图所示: 
 上图所描述的Job的提交流程,说明如下所示: 在MR程序中创建一个Job实例,设置Job状态 创建一个JobClient实例,准备将创建的Job实例提交到JobTracker 在创建JobClient的过程中,首先必须保证建立到JobTracker的RPC连接 基于JobSubmissionProtocol协议远程调用JobTracker获取一个新的Job 
                          ID 根据MR程序中配置的Job,在HDFS上创建Job相关目录,并将配置的tmpfiles、tmpjars、tmparchives,以及Job对应jar文件等资源复制到HDFS 根据Job配置的InputFormat,计算该Job输入的Split信息和元数据(SplitMetaInfo)信息,以及计算出map和reduce的个数,最后将这些信息连通Job配置写入到HDFS(保证JobTracker能够读取) 通过JobClient基于JobSubmissionProtocol协议方法submitJob提交Job到JobTracker MR程序创建Job 下面的MR程序示例代码,已经很熟悉了: 
                           
                            | public static void main(String[] args) throws Exception {02
 Configuration conf = new Configuration();
 03
 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
 04
 if (otherArgs.length != 2) {
 05
 System.err.println("Usage: wordcount <in> <out>");
 06
 System.exit(2);
 07
 }
 08
 Job job = new Job(conf, "word count");
 09
 job.setJarByClass(WordCount.class);
 10
 job.setMapperClass(TokenizerMapper.class);
 11
 job.setCombinerClass(IntSumReducer.class);
 12
 job.setReducerClass(IntSumReducer.class);
 13
 job.setOutputKeyClass(Text.class);
 14
 job.setOutputValueClass(IntWritable.class);
 15
 FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
 16
 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
 17
 System.exit(job.waitForCompletion(true) ? 0 : 1);
 18
 }
 
 |  在MR程序中,首先创建一个Job,并进行配置,然后通过调用Job的waitForCompletion方法将Job提交到MapReduce集群。这个过程中,Job存在两种状态:Job.JobState.DEFINE和Job.JobState.RUNNING,创建一个Job后,该Job的状态为Job.JobState.DEFINE,Job内部通过JobClient基于org.apache.hadoop.mapred.JobSubmissionProtocol协议提交给JobTracker,然后该Job的状态变为Job.JobState.RUNNING。 Job提交目录submitJobDir 通过如下代码可以看到,Job提交目录是如何创建的: 
                           
                            | JobConf jobCopy = job;Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this, jobCopy); // 获取到StagingArea目录
 JobID jobId = jobSubmitClient.getNewJobId();
 Path submitJobDir = new Path(jobStagingArea, jobId.toString());
 
 |  获取StagingArea目录,JobClient需要通过JobSubmissionProtocol协议的远程方法getStagingAreaDir从JobTracker端获取到,我们看一下JobTracker端的getStagingAreaDirInternal方法,如下所示: 
                           
                            | private String getStagingAreaDirInternal(String user) throws IOException {2
 final Path stagingRootDir = new Path(conf.get("mapreduce.jobtracker.staging.root.dir", "/tmp/hadoop/mapred/staging"));
 3
 final FileSystem fs = stagingRootDir.getFileSystem(conf);
 4
 return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
 5
 }
 
 |  最终获取到的StagingArea目录为${mapreduce.jobtracker.staging.root.dir}/${user}/.staging/,例如,如果使用默认的mapreduce.jobtracker.staging.root.dir值,用户为shirdrn,则StagingArea目录/tmp/hadoop/mapred/staging/shirdrn/.staging/。通过Path 
                          submitJobDir = new Path(jobStagingArea, jobId.toString());可以得到submitJobDir,假如一个job的ID为job_200912121733_0002,则submitJobDir的值为/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/ 拷贝资源文件 在配置Job的时候,可以指定tmpfiles、tmpjars、tmparchives,JobClient会将对应的资源文件拷贝到指定的目录中,对应目录如下代码所示: 
                           
                            | Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);2
 Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
 3
 Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
 4
 ...
 5
 Path submitJarFile = JobSubmissionFiles.getJobJar(submitJobDir);
 6
 job.setJar(submitJarFile.toString());
 7
 fs.copyFromLocalFile(originalJarFile, submitJarFile);
 
 |  上面已经知道Job提交目录,可以分别得到对应的资源所在目录: tmpfiles目录:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/files tmpjars目录:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/libjars tmparchives目录:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/archives Job Jar文件:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.jar 然后,就可以将对应的资源文件拷贝到对应的目录中。 计算并存储Split数据 根据Job配置中设置的InputFormat,计算该Job的数据数据文件是如何进行分片的,代码如下所示: 
                           
                            | Configuration conf = job.getConfiguration();2
 InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
 3
 List<InputSplit> splits = input.getSplits(job);
 
 |  实际上就是调用InputFormat的getSplits方法,如果不适用Hadoop自带的FileInputFormat的默认getSplits方法实现,可以自定义实现,重写该默认实现逻辑来定义数据数据文件分片的规则。计算出输入文件的分片信息,然后需要将这些分片数据写入到HDFS供JobTracker查询初始化MapTask,写入分片数据的实现代码:
 
                           
                            | T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);2
 // sort the splits into order based on size, so that the biggest
 3
 // go first
 4
 Arrays.sort(array, new SplitComparator()); // 根据InputSplit的长度做了一个逆序排序
 5
 JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array); // 将split及其元数据信息写入HDFS
 |  接着调用JobSplitWriter.createSplitFiles方法存储Split信息,并创建元数据信息,并保存元数据信息。存储Split信息,代码实现如下所示: 
                           
                            | SerializationFactory factory = new SerializationFactory(conf);02
 int i = 0;
 03
 long offset = out.getPos();
 04
 for(T split: array) {
 05
 long prevCount = out.getPos();
 06
 Text.writeString(out, split.getClass().getName());
 07
 Serializer<T> serializer = factory.getSerializer((Class<T>) split.getClass());
 08
 serializer.open(out);
 09
 serializer.serialize(split); // 将split序列化写入到HDFS文件中
 10
 long currCount = out.getPos();
 11
 String[] locations = split.getLocations();
 12
 final int max_loc = conf.getInt(MAX_SPLIT_LOCATIONS, 10);
 13
 if (locations.length > max_loc) {
 14
 LOG.warn("Max block location exceeded for split: "+ split + " splitsize: " + locations.length + " maxsize: " + max_loc);
 15
 locations = Arrays.copyOf(locations, max_loc);
 16
 }
 17
 info[i++] = new JobSplit.SplitMetaInfo(locations, offset, split.getLength()); // 创建SplitMetaInfo实例
 18
 offset += currCount - prevCount;
 19
 }
 
 |  我们先看一下FileSplit包含的分片内容,如下所示: 
                           
                            | private Path file;2
 private long start;
 3
 private long length;
 4
 private String[] hosts;
 
 |  在序列化保存FileSplit到HDFS,可以通过查看FileSplit的write方法,如下所示: 
                           
                            | @Override2
 public void write(DataOutput out) throws IOException {
 3
 Text.writeString(out, file.toString());
 4
 out.writeLong(start);
 5
 out.writeLong(length);
 6
 }
 
 |  需要注意的是,这里面并没有将FileSplit的hosts信息保存,而是存储到了SplitMetaInfo中new 
                          JobSplit.SplitMetaInfo(locations, offset, split.getLength())。 下面是保存SplitMetaInfo信息的实现: 
                           
                            | private static void writeJobSplitMetaInfo(FileSystem fs, Path filename,02
 FsPermission p, int splitMetaInfoVersion,
 03
 JobSplit.SplitMetaInfo[] allSplitMetaInfo) throws IOException {
 04
 // write the splits meta-info to a file for the job tracker
 05
 FSDataOutputStream out = FileSystem.create(fs, filename, p);
 06
 out.write(JobSplit.META_SPLIT_FILE_HEADER); 
// 写入META头信息:META_SPLIT_FILE_HEADER = "META-SPL".getBytes("UTF-8");
 07
 WritableUtils.writeVInt(out, splitMetaInfoVersion); // META版本信息:1
 08
 WritableUtils.writeVInt(out, allSplitMetaInfo.length); // META对象的数量:每个InputSplit对应一个SplitMetaInfo
 09
 for (JobSplit.SplitMetaInfo splitMetaInfo : allSplitMetaInfo) {
 10
 splitMetaInfo.write(out); // 每个都进行存储
 11
 }
 12
 out.close();
 13
 }
 
 |  看一下SplitMetaInfo存储时包含的数据信息: 
                           
                            | public void write(DataOutput out) throws IOException {2
 WritableUtils.writeVInt(out, locations.length); // location个数
 3
 for (int i = 0; i < locations.length; i++) {
 4
 Text.writeString(out, locations[i]); // 写入每一个location位置信息
 5
 }
 6
 WritableUtils.writeVLong(out, startOffset); // 偏移量
 7
 WritableUtils.writeVLong(out, inputDataLength); // 数据长度
 8
 }
 
 |  最后,我们看一下这些数据保存的目录和文件情况。前面已经知道Job提交目录,下面看split存储的文件是如何构建的: 
                           
                            | FSDataOutputStream out = createFile(fs, JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);2
 SplitMetaInfo[] info = writeNewSplits(conf, splits, out);
 
 
 |  那么split保存的文件为:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.split。 同样,split元数据信息文件构建如下所示: 
                           
                            | writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),2
 new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion, info);
 
 |  split元数据信息文件为:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.splitmetainfo。 保存Job配置数据 在提交Job到JobTracker之前,还需要保存Job的配置信息,这些配置数据根据用户在MR程序中配置,覆盖默认的配置值,最后保存到XML文件(job.xml)到HDFS,供JobTracker查询。如下代码,创建submitJobFile文件并写入job配置数据: 
                           
                            | ...02
 Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
 03
 jobCopy.set("mapreduce.job.dir", submitJobDir.toString());
 04
 ...
 05
 // Write job file to JobTracker's fs
 06
 FSDataOutputStream out = FileSystem.create(fs, submitJobFile, 
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
 07
 ...
 08
 try {
 09
 jobCopy.writeXml(out);
 10
 } finally {
 11
 out.close();
 12
 }
 
 |  前面已经知道Job提交目录,我们很容易就能得到job.xml文件的存储路径:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.xml。 最后,所有的数据都已经准备完成,JobClient就可以基于JobSubmissionProtocol协议方法submitJob,提交Job到JobTracker运行。 |