求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Modeler   Code  
会员   
 
  
 
 
     
   
分享到
MapReduce编程基础
 

作者:qiang.xu,发布于2012-12-12,来源:博客园

 

<1>. WordCount示例及MapReduce程序框架

首先通过一个简单的程序来实际运行一个MapReduce程序,然后通过这个程序我们来哦那个结一下MapReduce编程模型。

下载源程序:/Files/xuqiang/WordCount.rar,将该程序打包成wordcount.jar下面的命令,随便写一个文本文件,这里是WordCountMrtrial,并上传到hdfs上,这里的路径是/tmp/WordCountMrtrial,运行下面的命令:

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ ./bin/hadoop jar wordcount.jar WordCount /tmp/WordCountMrtrial /tmp/result

如果该任务运行完成之后,将在hdfs的/tmp/result目录下生成类似于这样的结果:

gentleman 11

get 12

give 8

go 6

good 9

government 16

运行一个程序的基本上就是这样一个过程,我们来看看具体程序:

main函数中首先生成一个Job对象, Job job = new Job(conf, "word count");然后设置job的MapperClass,ReducerClass,设置输入文件路径FileInputFormat.addInputPath(job, new Path(otherArgs[0]));设置输出文件路径:FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));等待程序运行完成:System.exit(job.waitForCompletion(true) ? 0 : 1);可以看出main中仅仅是启动了一个job,然后设置该job相关的参数,具体实现MapReduce是mapper类和reducer类。

TokenizerMapper类中map函数将一行分割成<K2, V2>,然后IntSumReducer的reduce将<K2, list<V2>>转换成最终结果<K3, V3>。

通过这个示例基本上也能总结出简单的MapReduce编程的模型:一个Mapper类,一个Reducer类,一个Driver类。

<2>. MapReduce程序执行流程

这里所描述的执行流程更加注重是从程序的角度去理解,更加全面的流程可参考[这里]。

首先用户指定待处理的文件,在WordCount就是文件WordCountMrtrial,这是hadoop根据设定的InputDataFormat来将输入文件分割成一个record(key/value对),然后将这些record传递给map函数,在WordCount示例中,对应的record就是<line_number行号, line_content该行内容>;

然后map函数根据输入的record,形成<K2, V2>,在WordCount示例中形成<K2, V2>就是<single_word, word_count>,例如<"a", 1>;

如果map过程完成之后,hadoop将这些生成的<K2, V2>按照K2进行分组,形成<K2,list(V2) >,之后传递给reduce函数,在该函数中最终得到程序的输出结果<K3, V3>。

<3>. 深入学习MapReduce编程(1)

3.1 hadoop data types

由于在hadoop需要将key/value对序列化,然后通过网络network发送到集群中的其他机器上,所以说hadoop中的类型需要能够序列化。

具体而言,自定义的类型,如果一个类class实现了Writable interface的话,那么这个可以作为value类型,如果一个class实现了WritableComparable<T> interface的话,那么这个class可以作为value类型或者是key类型。

hadoop本身已经实现了一些预定义的类型predefined classes,并且这些类型实现了WritableComparable<T>接口。

3.2 Mapper

如果一个类想要成为一个mapper,那么该类需要实现Mapper接口,同时继承自MapReduceBase。在MapReduceBase类中,两个方法是特别需要注意的:

void configure( JobConf job):这个方法是在任务被运行之前调用

void close():在任务运行完成之后调用

剩下的工作就是编写map方法,原型如下:

void map(Object key, Text value, Context context

) throws IOException, InterruptedException;

这个方法根据<K1, V1>生成<K2, V2>,然后通过context输出。

同样的在hadoop中预先定义了如下的Mapper:

3.3 Reducer

如果一个类想要成为Reducer的话,需要首先实现Reducer接口,然后需要继承自MapReduceBase。

当reducer接收从mapper传递而来的key/value对,然后根据key来排序,分组,最终生成<K2, list<V2>> ,然后reducer根据<K2, list<V2>>生成<K3, V3>.

同样在hadoop中预定义了一些Reducer:

3.4 Partitioner

Partitioner的作用主要是将mapper运行的结果“导向directing”到reducer。如果一个类想要成为Partitioner,那么需要实现Partitioner接口,该接口继承自JobConfigurable,定义如下:

public interface Partitioner<K2, V2> extends JobConfigurable {
  /** 
   * Get the paritition number for a given key (hence record) given the total 
   * number of partitions i.e. number of reduce-tasks for the job.
   *   
   * <p>Typically a hash function on a all or a subset of the key.</p>
   *
   * @param key the key to be paritioned.
   * @param value the entry value.
   * @param numPartitions the total number of partitions.
   * @return the partition number for the <code>key</code>.
   */
  int getPartition(K2 key, V2 value, int numPartitions);
}

hadoop将根据方法getPartition的返回值确定将mapper的值发送到那个reducer上。返回值相同的key/value对将被“导向“至同一个reducer。

3.5 Input Data Format and Output Data Format

3.5.1 Input Data Format

上面我们的假设是MapReduce程序的输入是key/value对,也就是<K1, V1>,但是实际上一般情况下MapReduce程序的输入是big file的形式,那么如何将这个文件转换成<K1, V1>,即file -> <K1, V1>。这就需要使用InputFormat接口了。

下面是几个常用InputFormat的实现类:

当然除了使用hadoop预先定义的InputDataFormat之外,还可以自定义,这是需要实现InputFormat接口。该接口仅仅包含两个方法:

InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;该接口实现将大文件分割成小块split。

RecordReader<K, V> getRecordReader(InputSplit split,

JobConf job,

Reporter reporter) throws IOException;

该方法输入分割成的split,然后返回RecordReader,通过RecordReader来遍历该split内的record。

3.5.2 Output Data Format

每个reducer将自己的输出写入到结果文件中,这是使用output data format来配置输出的文件的格式。hadoop预先实现了:

3.6 Streaming in Hadoop

3.6.1 执行流程

我们知道在linux中存在所谓的“流”的概念,也就是说我们可以使用下面的命令:

cat input.txt | RandomSample.py 10 >sampled_output.txt

同样在hadoop中我们也可以使用类似的命令,显然这样能够在很大程度上加快程序的开发进程。下面来看看hadoop中流执行的过程:

hadoop streaming从标砖输入STDIN读取数据,默认的情况下使用\t来分割每行,如果不存在\t的话,那么这时正行的内容将被看作是key,而此时的value内容为空;

然后调用mapper程序,输出<K2, V2>;

之后,调用Partitioner来将<K2, V2>输出到对应的reducer上;

reducer根据输入的<K2, list(V2)> 得到最终结果<K3, V3>并输出到STDOUT上。

3.6.2 简单示例程序

下面我们假设需要做这样一个工作,输入一个文件,文件中每行是一个数字,然后得到该文件中的数字的最大值(当然这里可以使用streaming中自带的Aggregate)。 首先我们编写一个python文件(如果对python不是很熟悉,看看[这里]):

3.6.2.1 准备数据

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.cs.brandeis.edu" >url1

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.nytimes.com" >url2

上传到hdfs上:

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -mkdir urls

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -put url1 urls/

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -put url2 urls/

3.6.2.2 编写mapper multifetch.py

#!/usr/bin/env python
import sys, urllib, re
title_re = re.compile("<title>(.*?)</title>",
        re.MULTILINE | re.DOTALL | re.IGNORECASE)
for line in sys.stdin:
    # We assume that we are fed a series of URLs, one per line
    url = line.strip()
    # Fetch the content and output the title (pairs are tab-delimited)
    match = title_re.search(urllib.urlopen(url).read())
    if match:
        print url, "\t", match.group(1).strip()

该文件的主要作用是给定一个url,然后输出该url代表的html页面的title部分。

在本地测试一下该程序:

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.cs.brandeis.edu" >urls

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.nytimes.com" >>urls

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ sudo chmod u+x ./multifetch.py

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ cat urls | ./multifetch.py 将输出:

http://www.cs.brandeis.edu Computer Science Department | Brandeis University

http://www.nytimes.com The New York Times - Breaking News, World News &amp; Multimedia

3.6.2.3 编写reducer reducer.py

编写reducer.py文件

#!/usr/bin/env python
from operator import itemgetter
import sys
for line in sys.stdin:
    line = line.strip()
    print line 

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ chmod u+x ./reducer.py

现在我们的mapper和reducer已经准备好了,那么首先在本地上运行测试一下程序的功能,下面的命令模拟在hadoop上运行的过程:

首先mapper从stdin读取数据,这里是一行;

然后读取该行的内容作为一个url,然后得到该url代表的html的title的内容,输出<url, url-title-content>;

调用sort命令将mapper输出排序;

将排序完成的结果交给reducer,这里的reducer仅仅是将结果输出。

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ cat urls | ./multifetch.py | sort | ./reducer.py 
 http://www.cs.brandeis.edu     Computer Science Department | Brandeis University
 http://www.nytimes.com     The New York Times - Breaking News, World News & Multimedia  

显然程序能够正确

3.6.2.4 在hadoop streaming上运行

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop jar ./mapred/contrib/streaming/hadoop-0.21.0-streaming.jar \
 > -mapper /home/xuqiang/hadoop/src/hadoop-0.21.0/multifetch.py \
 > -reducer /home/xuqiang/hadoop/src/hadoop-0.21.0/reducer.py \
 > -input urls/* \
 
> -output titles  

程序运行完成之后,查看运行结果:

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -cat titles/part-00000

相关文章 相关文档 相关视频



我们该如何设计数据库
数据库设计经验谈
数据库设计过程
数据库编程总结
数据库性能调优技巧
数据库性能调整
数据库性能优化讲座
数据库系统性能调优系列
高性能数据库设计与优化
高级数据库架构师
数据仓库和数据挖掘技术
Hadoop原理、部署与性能调优
 
分享到
 
 


MySQL索引背后的数据结构
MySQL性能调优与架构设计
SQL Server数据库备份与恢复
让数据库飞起来 10大DB2优化
oracle的临时表空间写满磁盘
数据库的跨平台设计
更多...   


并发、大容量、高性能数据库
高级数据库架构设计师
Hadoop原理与实践
Oracle 数据仓库
数据仓库和数据挖掘
Oracle数据库开发与管理


GE 区块链技术与实现培训
航天科工某子公司 Nodejs高级应用开发
中盛益华 卓越管理者必须具备的五项能力
某信息技术公司 Python培训
某博彩IT系统厂商 易用性测试与评估
中国邮储银行 测试成熟度模型集成(TMMI)
中物院 产品经理与产品管理
更多...