Hadoop常常被用作大型数据处理生态系统中的一部分。它的优势在于能够批量地处理大量数据,并将结果以最好的方式与其他系统相集成。从高层次角度来看,整个过程就是Hadoop接收输入文件、使用自定义转换(Map-Reduce步骤)获得内容流,以及将输出文件的结果写回磁盘。上个月InfoQ展示了怎样在第一个步骤中,使用InputFormat类来更好地对接收输入文件进行控制。而在本文中,我们将同大家一起探讨怎样自定义最后一个步骤——即怎样写入输出文件。OutputFormat将Map/Reduce作业的输出结果转换为其他应用程序可读的方式,从而轻松实现与其他系统的互操作。为了展示OutputFormts的实用性,我们将用两个例子进行讨论:如何拆分作业结果到不同目录以及如何为提供快速键值查找的服务写入文件。
OutputFormats是做什么的?
OutputFormt接口决定了在哪里以及怎样持久化作业结果。Hadoop为不同类型的格式提供了一系列的类和接口,实现自定义操作只要继承其中的某个类或接口即可。你可能已经熟悉了默认的OutputFormat,也就是TextOutputFormat,它是一种以行分隔,包含制表符界定的键值对的文本文件格式。尽管如此,对多数类型的数据而言,如再常见不过的数字,文本序列化会浪费一些空间,由此带来的结果是运行时间更长且资源消耗更多。为了避免文本文件的弊端,Hadoop提供了SequenceFileOutputformat,它将对象表示成二进制形式而不再是文本文件,并将结果进行压缩。下面是Hadoop提供的类层次结构:
- FileOutputFormat(实现OutputFormat接口)—— 所有OutputFormats的基类
- MapFileOutputFormat —— 一种使用部分索引键的格式
- SequenceFileOutputFormat —— 二进制键值数据的压缩格式
- SequenceFileAsBinaryOutputFormat —— 原生二进制数据的压缩格式
- TextOutputFormat —— 以行分隔、包含制表符定界的键值对的文本文件格式
- MultipleOutputFormat —— 使用键值对参数写入文件的抽象类
- MultipleTextOutputFormat —— 输出多个以标准行分割、制表符定界格式的文件
- MultipleSequenceFileOutputFormat —— 输出多个压缩格式的文件
OutputFormat提供了对RecordWriter的实现,从而指定如何序列化数据。
RecordWriter类可以处理包含单个键值对的作业,并将结果写入到OutputFormat中准备好的位置。RecordWriter的实现主要包括两个函数:“write”和“close”。“write”函数从Map/Reduce作业中取出键值对,并将其字节写入磁盘。LineRecordWriter是默认使用的RecordWriter,它是前面提到的TextOutputFormat的一部分。它写入的内容包括:
- 键(key)的字节 (由getBytes()函数返回)
- 一个用以定界的制表符
- 值(value)的字节(同样由getBytes()函数返回)
- 一个换行符
“close”函数会关闭Hadoop到输出文件的数据流。
我们已经讨论了输出数据的格式,下面我们关心的问题是数据存储在何处?同样,你或许看到过某个作业的输出结果会以多个“部分”文件的方式存储在输出目录中,如下:
|
|-- output-directory
| |-- part-00000
| |-- part-00001
| |-- part-00002
| |-- part-00003
| |-- part-00004
'-- part-00005 |
默认情况下,当需要写入数据时,每个进程都会在输出目录创建自己的文件。数据由reducers在作业结束时写入(如果没有reducers会由mapper写入)。即使在本文后面提到的创建自定义输出目录时,我们仍会保持写入“部分”文件,这么做可以让多个进程同时写入同一个目录而互不干扰。
自定义OutputFormat
从前面我们已经看到,OutputFormat类的主要职责是决定数据的存储位置以及写入的方式。那么为什么要自定义这些行为呢?自定义数据位置的原因之一是为了将Map/Reduce作业输出分离到不同的目录。例如,假设需要处理一个包含世界范围内的搜索请求的日志文件,并希望计算出每个国家的搜索频度。你想要在不牵涉其他国家的前提下能够查看某个特定国家的结果。也许以后在你的数据管道中,会用不同的进程来处理不同的国家,或者想要把某个特定国家的结果复制一份到该国的数据中心去。使用默认的OutputFormat时,所有的数据都会存储在同一目录下,这样在不浏览的情况下是无从知晓“部分”文件的内容的。而通过使用自定义的OutputFormat,你可以为每个国家创建一个子目录的布局,如下:
|
|-- output-directory
| |-- France
| | |-- part-00000
| | |-- part-00001
| | '-- part-00002
... |
| '-- Zimbabwe
| |-- part-00000
| |-- part-00001
| '-- part-00002 |
其中每个部分文件都具有键值对(“搜索词汇”=>频度)。现在只要简单地指定某个国家数据所在的路径,就可以只读取该国家的数据了。下面我们将看到怎样继承MultipleTextOutputFormat类,以获得所需的行为。
自定义OutputFormat还有一些其他的原因,以名为ElephantDB的项目为例, 它将数据以一种面向消费应用程序的“本地”形式进行存储。这个项目的设立是为了让Map/Reduece作业结果可以像分布式服务一样被查询。ElephantDB写入的并不是文本文件,而是使用自定义的OutputFormat将结果写成BerkeleyDB文件,其中这些文件使用作业输出的键进行索引。之后使用某个服务加载BerkeleyDB文件,可以提供低延滞的任意键查找。类似的系统还有HBase和Voldemort,它们可以存储Hadoop生成的键值数据。ElephantDB重点关注的是怎样与Hadoop批量式更新进行简易紧密的集成。
多路输出
为了解决上面的搜索日志的问题,我们继承了MultipleTextOutputFormat类,并根据被写入的键值来选择输出目录。我们的Map/Reduce作业将会为搜索请求所在国家生成一个键,并为搜索词汇及该搜索的频度产生一个值。由于MultipleTextOutputFormat已经知道如何写入文本文件,因此并不需要为OutputFormat实现序列化功能。清单1实现了该类:
|
1 package oddjob.hadoop;
2
3 import org.apache.hadoop.fs.Path;
4 import org.apache.hadoop.io.Text;
5 import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
6
7 public class MultipleTextOutputFormatByKey extends MultipleTextOutputFormat<Text, Text> {
8
9 /**
10 * Use they key as part of the path for the final output file.
11 */
12 @Override
13 protected String generateFileNameForKeyValue(Text key, Text value, String leaf) {
14 return new Path(key.toString(), leaf).toString();
15 }
16
17 /**
18 * When actually writing the data, discard the key since it is already in
19 * the file path.
20 */
21 @Override
22 protected Text generateActualKey(Text key, Text value) {
23 return null;
24 }
25 } |
清单1:MultipleTextOutputFormat子类样例
MultipleTextOutputFormatByKey类的generateActualFileNameForKeyValue方法指定了作业输出的存储位置(第13行)。对于每组由Map/Reduce作业生成的键值对,该类会把键加入到路径名称中作为输出。“leaf”参数就是我们之前看到的“part-0000”,它在每个reducer中都是独一无二的,这样可以允许不同进程同时写入到输出目录而互不影响。例如,由第一个reducer产生的键为“France”、值为“soccer
5000”的结果会被写入到“output-directory/France/part-00000”内的某个文件中。
要使用这个类,需确保Hadoop包含了这个自定义类的jar,并使用完整的类名作为“-outputformat”的参数:
|
hadoop jar hadoop-streaming.jar -libjars CustomOutputFormats.jar \
-outputformat oddjob.hadoop.MultipleTextOutputFormatByKey \
-input search-logs \
-output search-frequency-by-country \
-mapper parse-logs.py \
-reducer count-searches.py |
清单1是oddjob项目中某个类的Java实现。oddjob是一个开源库,提供了多种MultipleTextOutputFormat。虽然这个库面向的是Hadoop的流特性,但是它也可以用在产生文本键值输出的其他作业中。
为服务准备输出
在我们的下一个例子中,必须实现两个接口来自定义数据序列化以及文件存放的目录结构,以使结果可被ElephantDB服务加载。正如前面所讨论的,序列化部分会由RecordWriter的实现来处理。在LineRecordWriter类将字节流写入输出文件的同时,ElephantRecordWriter还包含了专门的逻辑用来选择要写入的文件以及使用第三方库来格式化磁盘上的数据。
|
1 public class ElephantRecordWriter implements RecordWriter<IntWritable, ElephantRecordWritable> {
2
3 FileSystem _fs;
4 Args _args;
5 Map<Integer, LocalPersistence> _lps = new HashMap<Integer, LocalPersistence>();
6 Progressable _progressable;
7 LocalElephantManager _localManager;
8
9 int _numWritten = 0;
10 long _lastCheckpoint = System.currentTimeMillis();
11
12 public ElephantRecordWriter(Configuration conf, Args args, Progressable progressable)
throws IOException {
13 _fs = Utils.getFS(args.outputDirHdfs, conf);
14 _args = args;
15 _progressable = progressable;
16 _localManager = new LocalElephantManager(_fs, args.spec, args.persistenceOptions,
LocalElephantManager.getTmpDirs(conf));
17 }
18
19 private String remoteUpdateDirForShard(int shard) {
20 if(_args.updateDirHdfs==null) return null;
21 else return _args.updateDirHdfs + "/" + shard;
22 }
23
24 public void write(IntWritable shard, ElephantRecordWritable record) throws IOException {
25 LocalPersistence lp = null;
26 LocalPersistenceFactory fact = _args.spec.getLPFactory();
27 Map<String, Object> options = _args.persistenceOptions;
28 if(_lps.containsKey(shard.get())) {
29 lp = _lps.get(shard.get());
30 } else {
31 String updateDir = remoteUpdateDirForShard(shard.get());
32 String localShard = _localManager.downloadRemoteShard("" + shard.get(), updateDir);
33 lp = fact.openPersistenceForAppend(localShard, options);
34 _lps.put(shard.get(), lp);
35 progress();
36 }
37
38 _args.updater.updateElephant(lp, record.key, record.val);
39
40 _numWritten++;
41 if(_numWritten % 25000 == 0) {
42 long now = System.currentTimeMillis();
43 long delta = now - _lastCheckpoint;
44 _lastCheckpoint = now;
45 LOG.info("Wrote last 25000 records in " + delta + " ms");
46 _localManager.progress();
47 }
48 }
49
50 public void close(Reporter reporter) throws IOException {
51 for(Integer shard: _lps.keySet()) {
52 String lpDir = _localManager.localTmpDir("" + shard);
53 LOG.info("Closing LP for shard " + shard + " at " + lpDir);
54 _lps.get(shard).close();
55 LOG.info("Closed LP for shard " + shard + " at " + lpDir);
56 progress();
57 String remoteDir = _args.outputDirHdfs + "/" + shard;
58 if(_fs.exists(new Path(remoteDir))) {
59 LOG.info("Deleting existing shard " + shard + " at " + remoteDir);
60 _fs.delete(new Path(remoteDir), true);
61 LOG.info("Deleted existing shard " + shard + " at " + remoteDir);
62 }
63 LOG.info("Copying " + lpDir + " to " + remoteDir);
64 _fs.copyFromLocalFile(new Path(lpDir), new Path(remoteDir));
65 LOG.info("Copied " + lpDir + " to " + remoteDir);
66 progress();
67 }
68 _localManager.cleanup();
69 }
70
71 private void progress() {
72 if(_progressable!=null) _progressable.progress();
73 }
74 } |
清单2:从ElephantDB中摘录的某个RecordWriter子类
ElephantDB的工作方式是通过跨越若干个LocalPersistence对象(BerkeleyDB文件)来对数据进行分片(划分)。ElephantRecordWriter类中的write函数拿到分片ID,并检查该分片是否已经打开(第28行),如果没有则打开并创建一个新的本地文件(第33行)。第38行的updateElephant调用将作业输出的键值对写入到BerkeleyDB文件。
当关闭ElephantRecordWriter时,该类在第64行会复制BerkeleyDB文件到HDFS中,且可以随意选择是否覆盖旧文件。接下去的progress方法调用会通知Hadoop当前的RecordWriter正在按计划进行,这有点类似于真实Map/Reduce作业中的状态或计数器更新。
下一步是利用ElephantRecordWriter来实现OutputFormat。要理解此清单中的代码,重点是了解Hadoop
JobConf对象封装了什么。顾名思义,JobConf对象包含了某项作业的全部设置,包括输入输出目录,作业名称以及mapper和reducer类。清单3展示了两个自定义类是如何共同工作的:
|
1 public class ElephantOutputFormat implements OutputFormat<IntWritable, ElephantRecordWritable> {
2 public static Logger LOG = Logger.getLogger(ElephantOutputFormat.class);
3
4 public RecordWriter<IntWritable, ElephantRecordWritable> getRecordWriter(FileSystem fs, JobConf conf,
String string, Progressable progressable) throws IOException {
5 return new ElephantRecordWriter(conf, (Args) Utils.getObject(conf, ARGS_CONF), progressable);
6 }
7
8 public void checkOutputSpecs(FileSystem fs, JobConf conf) throws IOException {
9 Args args = (Args) Utils.getObject(conf, ARGS_CONF);
10 fs = Utils.getFS(args.outputDirHdfs, conf);
11 if(conf.getBoolean("mapred.reduce.tasks.speculative.execution", true)) {
12 throw new InvalidJobConfException("Speculative execution should be false");
13 }
14 if(fs.exists(new Path(args.outputDirHdfs))) {
15 throw new InvalidJobConfException("Output dir already exists " + args.outputDirHdfs);
16 }
17 if(args.updateDirHdfs!=null && !fs.exists(new Path(args.updateDirHdfs))) {
18 throw new InvalidJobConfException("Shards to update does not exist " + args.updateDirHdfs);
19 }
20 }
21 } |
清单3:从ElephantDB中摘录的某个OutputFormat实现
正如前面所看到的,OutputFormat有两个职责,分别是决定数据的存储位置以及数据写入的方式。ElephantOutputFormat的数据存储位置是通过检查JobConf以及在第14和17行检查确保该位置是一个合法目标位置后来决定的。至于数据的写入方式,则是由getRecordWriter函数处理,它的返回结果是清单2中的ElephantRecordWriter对象。
从Hadoop的角度来看,当Map/Reduce作业结束并且每个reducer产生了键值对流的时候,这些类会派上用场。Hadoop会以作业配置为参数调用checkOutputSpecs。如果函数运行没有抛出异常,它会接下去调用getRecordWriter以返回可以写入流数据的对象。当所有的键值对都被写入后,Hadoop会调用writer中的close函数,将数据提交到HDFS并结束该reducer的职责。
总结
OutputFormat是Hadoop框架中的重要组成部分。它们通过为目标消费应用程序产生合适的输出来提供与其他系统和服务间的互操作。自定义作业输出位置可以简化并加速数据工作流;而自定义结果输出方式可以让其快速地工作于其他不同的环境下。虽然实现OutputFormat和覆写几个方法一样简单,但是它足够灵活可以支持全新的磁盘上的数据格式。
|