MapReduce与HDFS简介
什么是Hadoop?
Google为自己的业务需要提出了编程模型MapReduce和分布式文件系统Google File System,并发布了相关论文(可在Google
Research的网站上获得: GFS 、 MapReduce)。 Doug Cutting和Mike
Cafarella在开发搜索引擎Nutch时对这两篇论文做了自己的实现,即同名的MapReduce和HDFS,合起来就是Hadoop。
MapReduce的Data flow如下图,原始数据经过mapper处理,再进行partition和sort,到达reducer,输出最后结果。
图片来自Hadoop: The Definitive
Guide
Hadoop Streaming原理
Hadoop本身是用Java开发的,程序也需要用Java编写,但是通过Hadoop Streaming,我们可以使用任意语言来编写程序,让Hadoop运行。
Hadoop Streaming的相关源代码可以在Hadoop的Github
repo 查看。简单来说,就是通过将用其他语言编写的mapper和reducer通过参数传给一个事先写好的Java程序(Hadoop自带的*-streaming.jar),这个Java程序会负责创建MR作业,另开一个进程来运行mapper,将得到的输入通过stdin传给它,再将mapper处理后输出到stdout的数据交给Hadoop,partition和sort之后,再另开进程运行reducer,同样地通过stdin/stdout得到最终结果。因此,我们只需要在其他语言编写的程序里,通过stdin接收数据,再将处理过的数据输出到stdout,Hadoop
streaming就能通过这个Java的wrapper帮我们解决中间繁琐的步骤,运行分布式程序。
图片来自Hadoop: The Definitive
Guide
原理上只要是能够处理stdio的语言都能用来写mapper和reducer,也可以指定mapper或reducer为Linux下的程序(如awk、grep、cat)或者按照一定格式写好的java
class。因此,mapper和reducer也不必是同一类的程序。
Hadoop Streaming的优缺点
优点
可以使用自己喜欢的语言来编写MapReduce程序(换句话说,不必写Java
XD)
不需要像写Java的MR程序那样import一大堆库,在代码里做一大堆配置,很多东西都抽象到了stdio上,代码量显著减少
因为没有库的依赖,调试方便,并且可以脱离Hadoop先在本地用管道模拟调试
缺点
只能通过命令行参数来控制MapReduce框架,不像Java的程序那样可以在代码里使用API,控制力比较弱,有些东西鞭长莫及
因为中间隔着一层处理,效率会比较慢
所以Hadoop Streaming比较适合做一些简单的任务,比如用python写只有一两百行的脚本。如果项目比较复杂,或者需要进行比较细致的优化,使用Streaming就容易出现一些束手束脚的地方。
用python编写简单的Hadoop Streaming程序
这里提供两个例子:
1.Michael Noll的word count程序
2.Hadoop: The Definitive Guide里的例程
使用python编写Hadoop Streaming程序有几点需要注意:
1.在能使用iterator的情况下,尽量使用iterator,避免将stdin的输入大量储存在内存里,否则会严重降低性能
2.streaming不会帮你分割key和value传进来,传进来的只是一个个字符串而已,需要你自己在代码里手动调用split()
3.从stdin得到的每一行数据末尾似乎会有\n,保险起见一般都需要使用rstrip()来去掉
4.在想获得K-V list而不是一个个处理key-value pair时,可以使用groupby配合itemgetter将key相同的k-v
pair组成一个个group,得到类似Java编写的reduce可以直接获取一个Text类型的key和一个iterable作为value的效果。注意itemgetter的效率比lambda表达式要高,所以如果需求不是很复杂的话,尽量用itemgetter比较好。
我在编写Hadoop Streaming程序时的基本模版是
#!/usr/bin/env python # -*- coding: utf-8 -*- """ Some description here... """
import sys
from operator import itemgetter
from itertools import groupby
def read_input(file):
"""Read input and split."""
for line in file:
yield line.rstrip().split('\t')
def main():
data = read_input(sys.stdin)
for key, kviter in groupby(data, itemgetter(0)):
# some code here..
if __name__ == "__main__":
main()
|
如果对输入输出格式有不同于默认的控制,主要会在read_input()里调整。
本地调试
本地调试用于Hadoop Streaming的python程序的基本模式是:
$ cat <input path> | python <path to mapper script> | sort -t $'\t' -k1,1 | python <path to reducer script> > <output path> |
或者如果不想用多余的cat,也可以用<定向
$ python <path to mapper script> < <input path> | sort -t $'\t' -k1,1 | python <path to reducer script> > <output path> |
这里有几点需要注意:
1.Hadoop默认按照tab来分割key和value,以第一个分割出的部分为key,按key进行排序,因此这里使用
来模拟。如果你有其他需求,在交给Hadoop Streaming执行时可以通过命令行参数调,本地调试也可以进行相应的调整,主要是调整sort的参数。因此为了能够熟练进行本地调试,建议先掌握sort命令的用法。
2.如果你在python脚本里加上了shebang,并且为它们添加了执行权限,也可以用类似于
来代替
二、在集群上运行与监控
为了方便,这篇文章里的例子均为伪分布式运行,一般来说只要集群配置得当,在伪分布式下能够运行的程序,在真实集群上也不会有什么问题。
为了更好地模拟集群环境,我们可以在mapred-site.xml中增设reducer和mapper的最大数目(默认为2,实际可用数目大约是CPU核数-1)。
假设你为Hadoop安装路径添加的环境变量叫$HADOOP_HOME(如果是$HADOOP_PREFIX,下文看到的命令对应改改就行)
$ vi $HADOOP_HOME/conf/mapred-site.xml
|
假设这台机子的CPU是4核,那么可以添加下面这几行
<property> <name>mapred.tasktracker.reduce.tasks.maximum</name> <value>3</value> </property> <property> <name>mapred.tasktracker.map.tasks.maximum</name> <value>3</value> </property>
|
这里修改了以后只是增加了slot的数量,如果你在执行MR作业时没有指明需要多少mapper或reducer,不一定会用到这么多,Hadoop会自行分配。
查看文档
首先需要知道用于streaming的java程序在哪里。在1.0.x的版本中,应该都在$HADOOP_HOME/contrib/streaming/下。比如1.0.4的就在
$HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar
|
里。 首先我们可以先看看这个java程序自带的文档。以下以1.0.4版本为例,执行
$ hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar -info
|
就会看到一系列自带的帮助,带有各种参数的说明和一些使用样例。
运行命令
用Hadoop Streaming执行python程序的一般步骤是:
1.将输入文件放到HDFS上,建议使用copyFromLocal而不是put命令,参见Difference
between hadoop fs -put and hadoop fs -copyFromLocal
1.一般可以新建一个文件夹用于存放输入文件,假设叫input
然后用
查看目录,可以看到出现了一个/user/hadoop/input文件夹。/user/hadoop是默认的用户文件夹,相当于本地文件系统中的/home/hadoop。
2.再使用
$ hadoop fs -copyFromLocal <PATH TO LOCAL FILE(S)> input/
|
将本地文件放到input文件夹下。copyFromLocal命令的用法类似于Linux的cp命令,支持使用wildcard。如果出现了预期外的执行结果,可以试试看在使用了wildcard的路径外加上引号。
2.开始MR作业,以1.0.4版本为例,假设你现在正在放有mapper和reducer两个脚本的目录下,而且它们刚好就叫mapper.py和reducer.py,在不需要做其他配置的情况下,执行
$hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar \ -mapper mapper.py -file mapper.py\ -reducer reducer.py -file reducer.py \ -input input/* -output output |
第一行是告诉Hadoop运行streaming的java程序,接下来的是参数:
这里的mapper.py和reducer.py是mapper所在python程序的路径。为了让Hadoop将程序分发给其他机器,需要再加一个-file参数用于指明要分发的程序放在哪里。
注意这样写的前提是这个python程序里有shebang而且添加了执行权限。如果没有的话,可以改成
-mapper 'python mapper.py' |
加上解释器命令,用引号括住。因为准确来说,mapper后面跟的其实应该是一个命令而不是一个文件名。
假如你执行的程序不放在当前目录下,比如说在当前目录的src文件夹下,可以这样写
-mapper mapper.py -file src/mapper.py\ -reducer reducer.py -file src/reducer.py \
|
也就是说,-mapper和-reducer后面跟的文件名不需要带上路径,而-file后的参数则需要。注意如果你在mapper后的命令用了引号,加上路径名反而会报错说找不到这个程序。
-input和-output后面跟的是HDFS上的路径名,同样支持wildcard,这里的input/*指的就是“input文件夹下的所有文件”。注意-output后面跟着的需要是一个不存在于HDFS上的路径,在产生输出的时候hadoop会帮你创建这个文件夹,如果已经存在的话就会产生冲突。
有时候shebang不一定能用,尤其是在执行环境比较复杂的时候。最保险的写法可能是:
这里有点要特别说明的地方$hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar \ -mapper 'python mapper.py' -file mapper.py\ -reducer 'python reducer.py' -file reducer.py \ -input input/* -output output
|
这样写还有一个好处,就是可以在引号里写上提供给python程序的命令行参数,以后的教程会提到怎么用。
由于mapper和reducer参数跟的实际上是命令,所以如果每台机器上python的环境配置不一样的话,会用每台机器自己的配置去执行python程序。
运行过程
写完命令回车,顺利的话会开始执行程序。 这里不赘述执行时输出到终端的内容,可以去这里看看正常运行的时候会给些什么。
利用WebUI监控集群运行情况
一般来说要检查运行状况,都是去jobtracker的webUI。如果在master上,用浏览器访问http://localhost:50030即可(如果你在配置hadoop的时候修改了mapred-site.xml的mapred.job.tracker.http.address,请访问对应的其他地址)
在webUI里你可以看到running jobs, completed jobs和retired jobs。点击Jobid下的超链接,可以看到对应job的执行状况。进去后如果看到Failed/Killed
Task Attempts下非空,你可以点进对应的超链接,找到对应的log去进行debug。
得到结果
成功执行完这个任务之后,你用output参数在HDFS上指定的输出文件夹里就会多出几个文件
一个空白文件_SUCCESS,表明job运行成功,这个文件可以让其他程序只要查看一下HDFS就能判断这次job是否成功运行,从而进行相关处理。
一个_logs文件夹,顾名思义里面放着任务日志
part-00000, .... part-xxxxx文件,有多少个reducer后面的数字就会有多大,对应每个reducer的输出结果。
假如你的输出很少,比如是一个只有几行的计数,你可以用
$ hadoop fs -cat <PATH ON HDFS>
|
直接将输出打印到终端查看。
假如你的输出很多,则需要拷贝到本地文件系统来查看。可以使用copyToLocal来获取整个文件夹(与copyFromLocal一样,它与get的区别在于会限制目标文件夹一定在本地文件系统上)。如果你不需要_SUCCESS
和_logs,并且想要将所有reducer的输出合并,可以使用getmerge命令。
比如在上面的例子里,可以用命令
$ hadoop fs -copyToLocal output ./
|
将output文件夹复制到本地文件系统的当前目录下,或者用
$ hadoop fs -getmerge output ./
|
将output下的part-xxxxx合并,放到当前目录的一个叫output的文件里。
如何串联多趟MR
如果你有多次任务要执行,下一步需要用上一步的任务做输入,解决办法其实很简单。假设上一步在HDFS的输出文件夹是output1,那么在下一步的运行命令中,指明
即指定上一次的所有输出为本次任务的输入即可。注意这里假设你不需要对上一步的输出做额外处理。
其他
这篇文章只提到了最简单的执行Hadoop streaming程序的方法。涉及到一些其他需求,比如需要有多个输入文件等情况,还需要进一步调整运行命令,会在以后的文章里讲到。
三、 自定义功能
使用额外的文件
假如你跑的job除了输入以外还需要一些额外的文件(side data),有两种选择:
1.大文件
所谓的大文件就是大小大于设置的local.cache.size的文件,默认是10GB。这个时候可以用-file来分发。除此之外代码本身也可以用file来分发。
格式:假如我要加多一个sideData.txt给python脚本用:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -input iputDir \ -output outputDir \ -mapper mapper.py \ -file mapper.py \ -reducer reduer.py \ -file reducer.py \ -file sideDate.txt
|
在python脚本里,只要把这个文件当成自己同一目录下的本地文件来打开就可以了。比如:
注意这个file是只读的,不可以写。
2.小文件
如果是比较小的文件,想要提高读写速度可以将它放在distributed cache里(也就是每台机器都有自己的一份copy,不需要网络IO就可以拿到数据)。这里要用到的参数是-cachefile,写法和用法上一个一样,就是-file改成-cachefile而已。
控制partitioner
partitioning指的是数据经过mapper处理后,被分发到reducer上的过程。partitioner控制的,就是“怎样的mapper输出会被分发到哪一个reducer上”。
Hadoop有几个自带的partitioner,解释可以看这里。默认的是HashPartitioner,也就是把第一个tab前的key做hash之后用于分配partition。写Hadoop
Streaming程序是可以选择其他partitioner的,你可以选择自带的其他几种里的一种,也可以自己写一个继承Partitioner的java类然后编译成jar,在运行参数里指定为你用的partitioner。
官方自带的partitioner里最常用的是KeyFieldBasedPartitioner(源代码可以看这里)。它会按照key的一部分来做partition,而不是用整个key来做partition。
在学会用KeyFieldBasedPartitioner之前,必然要先学怎么控制key-value的分割。分割key的步骤可以分为两步,用python来描述一下大约是
fields = output.split(seperator) key = fields[:numKeyfields]
|
1.选择用什么符号来分割key,也就是选择seperator
map.output.key.field.separator可以指定用于分隔key的符号。比如指定为一点的话,就要加上参数
-D stream.map.output.field.separator=.
|
假设你的mapper输出是
这时会先看准[11, 22, 33, 44]这里的其中一个或几个作为key
2.选择key的范围,也就是选择numKeyfields
控制key的范围的参数是这个,假设我要设置被分割出的前2个元素为key:
-D stream.num.map.output.key.fields=2 |
那么key就是上面的 1122。值得注意的是假如这个数字设置到覆盖整个输出,在这个例子里是4的话,那么整一行都会变成key。
上面分割出key之后, KeyFieldBasedPartitioner还需要知道你想要用key里的哪部分作为partition的依据。它进行配置的过程可以看源代码来理解。
假设在上一步我们通过使用
-D stream.map.output.field.separator=. \ -D stream.num.map.output.key.fields=4 \
|
将11.22.33.44的整个字符串都设置成了key,下一步就是在这个key的内部再进行一次分割。map.output.key.field.separator可以用来设置第二次分割用的分割符,mapred.text.key.partitioner.options可以接受参数来划分被分割出来的partition
key,比如:
-D map.output.key.field.separator=. \ -D mapred.text.key.partitioner.options=-k1,2 \
|
指的就是在key的内部里,将第1到第2个被点分割的元素作为partition key,这个例子里也就是1122。这里的值-ki,j表示从i到j个元素(inclusive)会作为partition
key。如果终点省略不写,像-ki的话,那么i和i之后的元素都会作为partition key。
partition key相同的输出会保证分到同一个reducer上,也就是所有11.22.xx.xx的输出都会到同一个partitioner,11.22换成其他各种组合也是一样。
实例说明一下,就是这样的:
1.mapper的输出是
11.12.1.2 11.14.2.3 11.11.4.1 11.12.1.1 11.14.2.2 |
2.指定前4个元素做key,key里的前两个元素做partition key,分成3个partition的话,就会被分成
11.11.4.1 ----------- 11.12.1.2 11.12.1.1 ----------- 11.14.2.3 11.14.2.2
|
3.下一步reducer会对自己得到的每个partition内进行排序,结果就是
11.11.4.1 ----------- 11.12.1.1 11.12.1.2 ----------- 11.14.2.2 11.14.2.3 |
命令格式大约就是长这样
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -D stream.map.output.field.separator=. \ -D stream.num.map.output.key.fields=4 \ -D map.output.key.field.separator=. \ -D mapred.text.key.partitioner.options=-k1,2 \ -input inputDir \ -output outputDir \ -mapper mapper.py -file mapper.py \ -reducer reducer.py -file reducer.py \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
|
注意-D参数放在前面,指定用KeyFieldBasedPartitioner的-partitioner要放在下面。
控制comparator与自定义排序
上面说到mapper的输出被partition到各个reducer之后,会有一步排序。这个排序的标准也是可以通过设置comparator控制的。和上面一样,要先设置分割出key用的分隔符、key的范围,key内部分割用的分隔符
-D stream.map.output.field.separator=. \ -D stream.num.map.output.key.fields=4 \ -D map.output.key.field.separator=. \ |
这里要控制的就是key内部的哪些元素用来做排序依据,是排字典序还是数字序,倒序还是正序。用来控制的参数是mapred.text.key.comparator.options,接受的值格式类似于unix
sort。比如我要按第二个元素的数字序(默认字典序)+倒序来排元素的话,就用-D mapred.text.key.comparator.options=-k2,2nr
n表示数字序,r表示倒序。这样一来
11.12.1.2 11.14.2.3 11.11.4.1 11.12.1.1 11.14.2.2
|
就会被排成
11.14.2.3 11.14.2.2 11.12.1.2 11.12.1.1 11.11.4.1
|
|