不同场景的模式和示例
MapReduce 处理为处理和构建不同类型的查询创建了一整套新范例和结构。然而,要最充分地利用
Hadoop,意味着要编写合适的 MapReduce 查询来处理信息。本文介绍许多不同的场景,其中包含如何开发不同类型的查询的食谱式示例。
高级文本处理
处理文本是 MapReduce 流程的一种常见用法,因为文本处理相对复杂且是处理器资源密集的处理。基本的字数统计常常用于演示
Haddoop 处理大量文本和基本汇总大体内容的能力。
要获得字数,将文本从一个输入文件中拆分(使用一个基本的 string tokenizer)为各个包含计数的单词,并使用一个
Reduce 来计算每个单词的数量。例如,从短语 the quick brown fox jumps over
the lazy dog 中,Map 阶段生成清单 1 中的输出。
清单 1. Map 阶段的输出
the, 1 quick, 1 brown, 1 fox, 1 jumps, 1 over, 1 the, 1 lazy, 1 dog, 1 |
Reduce 阶段然后合计每个惟一的单词出现的次数,得到清单 2 中所示的输出。
清单 2. Reduce 阶段的输出
the, 2 quick, 1 brown, 1 fox, 1 jumps, 1 over, 1 lazy, 1 dog, 1 |
尽管此方法适用于基本的字数统计,但您常常希望识别重要的短语或单词的出现。例如,获取 Amazon 上对不同影片和视频的评论。
使用来自 Stanford University 大数据项目的信息,您可以下载影片评论数据(参见 参考资料)。该数据包含(Amazon
上报告的)原始评论的评分和有用性,如清单 3 中所示。
清单 3. 下载影片评论数据
product/productId: B003AI2VGA review/userId: A3QYDL5CDNYN66 review/profileName: abra "a devoted reader" review/helpfulness: 0/0 review/score: 2.0 review/time: 1229040000 review/summary: Pretty pointless fictionalization review/text: The murders in Juarez are real. This movie is a badly acted fantasy of revenge and holy intercession. If there is a good movie about Juarez, I don't know what it is, but it is not this one. |
请注意,尽管评论者给影片打了 2 分(1 为最差,5 为最好),但评论内容将此影片描述为一部非常差的影片。我们需要一个置信度评分,以便能够了解所给的评分与实际的评论是否彼此匹配。
许多工具可用于执行高级启发式分析,但基本的处理可使用一个简单的索引或正则表达式来实现。然后,我们可统计正面和负面正则表达式匹配数来获得一部影片的分数。
图 1. 统计正面和负面正则表达式匹配数来获得一部影片的分数
该图显示了如何从原始数据的单词分数来获得影片分数
对于 Map 部分,统计影片评论中各个单词或短语的数量,为正面和负面评价提供单个计数。Map 操作从产品评论中统计影片的分数,Reduce
操作然后按产品 ID 汇总这些分数,以提供正面或负面的评分。因此 Map 类似于清单 4。
清单 4. 为正面和负面评论提供单个计数的 Map 函数
// List of positive words/phrases static String[] pwords = {"good","excellent","brilliant movie"}; // List of negative words/phrases static String[] nwords = {"poor","bad","unwatchable"};
int count = 0;
for (String word : pwords) {
String REGEX = "\\b" + word + "\\b";
Pattern p = Pattern.compile(REGEX);
Matcher m = p.matcher(INPUT);
while(m.find()) {
count++;
}
for (String word : nwords) {
String REGEX = "\\b" + word + "\\b";
Pattern p = Pattern.compile(REGEX);
Matcher m = p.matcher(INPUT);
while(m.find()) {
count--;
}
}
output.collect(productId, count); |
Reduce 然后可像传统的内容求和那样计算。
清单 5. 按产品 ID 对正面和负面评论求和的 Reduce 函数
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key,
Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
} |
结果是评论的置信度分数。可以扩展单词列表来包含您想要匹配的短语。
读取和写入 JSON 数据
JSON 已成为一种实用的数据交换格式。它的实用性一定程度上源于它的简单性质和结构,以及在如此多的语言和环境中解析的轻松性。
在解析传入的 JSON 数据时,最常见的格式是每个符号输入行一条 JSON 记录。
清单 6. 每个符号输入行一条 JSON 记录
{ "productId" : "B003AI2VGA", "score": 2.0, "text" : """} { "productId" : "B007BI4DAT", "score": 3.4, "text" : """} { "productId" : "B006AI2FDH", "score": 4.1, "text" : """} |
此代码可通过使用合适的类(比如 GSON)将传入的字符串转换为 JSON 对象来轻松解析。将此方法用于
GSON 时,您将需要去序列化到一个预先确定的类中。
清单 7. 去序列化到一个预先确定的类中
class amazonRank { private String productId; private float score; private String text; amazonRank() { } } |
解析传入的文本,如下所示。
清单 8. 解析传入的文本
public void map(Object key, Text value, Context context) throws IOException, InterruptedException { try {
amazonRank rank = gson.fromJson(value.toString(),amazonRank.class);
... |
要写入 JSON 数据,可执行相反的操作。创建您想要与 MapReduce 定义内的 JSON 输出匹配的输出类,然后使用
GSON 类将此转换为此结构的一种 JSON 表示。
清单 9. 写入 JSON 数据
class recipeRecord { private String recipe; private String recipetext; private int recipeid; private float calories; private float fat; private float weight; recipeRecord() { } } |
现在您可在输出期间填充对象的一个实例,将它转换为单条 JSON 记录。
清单 10. 在输出期间填充对象的一个实例
recipeNutrition recipe = new recipeRecord(); recipe.recipeid = key.toString(); recipe.calories = sum;
Gson json = new Gson();
output.collect(key, new Text(json.toJson(recipe))); |
如果您要在 Hadoop 处理作业中使用一个第三方库,请确保将库 JAR 文件与 MapReduce
代码包含在一起:$ jar -cvf recipenutrition.jar -C recipenutrition/*
google-gson/gson.jar。
尽管在 Hadoop MapReduce 处理器之外,但另一种替代方案是使用 Jaql,它将直接解析并处理
JSON 数据。
合并数据集
一个 MapReduce 作业中通常执行 3 种类型的合并:
组合多个具有相同结构的文件的内容。
组合多个您想要组合的具有类似结构的文件的内容。
联接来自多个来源的与一个特定 ID 或关键词相关的数据。
第一个选项最好,在典型的 MapReduce 作业外部处理,因为它可使用 Hadoop Distributed
File System (HDFS) getmerge 操作或某个类似操作完成。此操作接受单个目录作为内容并输出到一个指定文件。例如,$
hadoop fs -getmerge srcfiles megafile 将 srcfiles 目录中的所有文件合并到一个文件中:megafile。
合并类似文件
要合并类似但不等同的文件,主要问题在于如何识别输入时使用的格式以及如何指定输出的格式。例如,给定文件
name, phone, count 和第二个文件 name, email, phone, count,您要负责确定哪个文件是正确的并执行
Map 来生成所需的结构。对于更复杂的记录,您可能需要在 Map 阶段对包含和不包含空值的字段执行更复杂的合并,以生成信息。
事实上,Hadoop 不是此过程的理想选择,除非您还将它作为简化、统计或化简信息的一个机会。也就是说,您识别传入记录的数量,有哪些可能的格式,并在您想要选择的字段上执行
Reduce。
联接
尽管有一些潜在的解决方案来执行联接,但它们常常依赖于以一种结构化方式处理信息,然后使用此结构确定对输出信息做什么。
举例而言,给定两条不同的信息线索(比如电子邮件地址、发送的电子邮件数量,以及接收的电子邮件地址数量),目的在于将数据合并到一种输出格式中。这是输入文件:email,
sent-count 和 email, received-count。输出应为此格式:email, sent-count,
received-count。
处理传入的文件并以不同方式输出内容,以便可以不同方式访问和生成文件和数据。然后依靠 Reduce 函数来执行化简。在大多数情况中,这将是一个多阶段过程:
一个阶段处理 “已发送的” 电子邮件,以 email, fake#sent 形式输出信息
注意:我们使用伪前缀来调整顺序,以便数据可按伪前缀来核对,而不按收到的前缀来核对。此做法允许数据按虚假、暗含的顺序联接。
一个阶段处理 “已发送的” 电子邮件,以 email, received 形式输出信息。
在 Map 函数读取文件时,它生成一些行。
清单 11. 生成行
dev@null.org,0#sent dev@null.org, received |
Map 识别输入记录并输出一个带一个键的统一版本。输出并生成 sent#received 结构来处理内容,确定该值应合并在一起还是汇总为一个单纯收到的值。
清单 12. 输出一个带一个键的统一版本
int sent = 0; int received = 0; for (Text val : values) { String strVal = val.toString(); buf.append(strVal).append(","); if (strVal.contains("#")) { String[] tokens = strVal.split("#"); // If the content contains a hash, assume it's sent and received int recvthis = Integer.parseInt(tokens[0]); int sentthis = Integer.parseInt(tokens[1]); received = received + Integer.parseInt(recvthis); sent = sent _ sentthis; } else { // Otherwise, it's just the received value received = received + Integer.parseInt(strVal); } } context.write(key, IntWritable(sendReplyCount), new IntWritable(receiveReplyCount)); |
在此情况下,我们依赖于 Hadoop 本身内的化简来按该键简化输出数据(在此情况下,该键为电子邮件地址),简化为我们需要的信息。因为该信息是以电子邮件为键,所以记录可以电子邮件为键来轻松地合并。
使用键的技巧
请记住,MapReduce 过程的一些方面可为我们所用。在本质上,MapReduce 是一个两阶段过程:
Map 阶段访问数据,挑选您需要的信息,然后输出该信息,使用一个键和关联的信息。
Reduce 阶段使用通用的键将映射的数据合并、汇总或统计为一种更简单的形式,从而简化数据。
键是一个重要的概念,因为它可用于以不同方式格式化和汇总数据。例如,如果您计划化简有关国家和城市人口的数据,可以仅输出一个键来按国家化简或汇总数据。
清单 13. 仅输出一个键
要按国家和城市汇总,键是二者的复合版本。
清单 14. 键是国家和城市的复合版本
France#Paris France#Lyon France#Grenoble United Kingdom#Birmingham United Kingdom#London |
这是一个基本的技巧,可在处理某些类型的数据时为我们所用(例如具有一个共同键的材料),因为我们可使用它模拟伪联接。此技巧在组合博客文章(拥有一个
blogpostid 以便于识别)和博客评论(拥有一个 blogpostid 和 blogcommentid)时也很有用。
要化简输出(例如统计博客和评论中的字数),我们首先通过 Map 处理博客文章和博客评论,但我们输出一个通用的
ID。
清单 15. 化简输出
blogpostid,the,quick,brown,fox blogpostid#blogcommentid,jumps,over,the,lazy,dog |
这会明显地使用两个键,将信息输出为两个不同的信息行。我们也可反转这一关系。我们可通过向每个单词添加评论
ID,从评论中针对 blogpostid 来识别单词。
清单 16. 反转关系
blogpostid,the,quick,brown,fox,jumps#blogcommentid,over#blogcommentid, the#blogcommentid,lazy#blogcommentid,dog#blogcommentid |
在处理期间,我们可通过查看 ID 而获知该单词是否附加到博客文章,以及它是否按该格式附加到博客文章或评论。
模拟传统的数据库操作
Hadoop 在真正意义上不是一个真正的数据库,这一定程度上是因为我们无法逐行执行更新、删除或插入。尽管这在许多情况下不是问题(您可对要处理的活动数据执行转储和加载),但有时您不希望导出并重新加载数据。
一种避免导出并重新加载数据的技巧是,创建一个变更文件,其中包含来自原始转储文件的一个差异列表。现在我们暂时忽略从
SQL 或其他数据库生成这些数据的过程。只要数据有一个惟一 ID,我们就可将它用作键,就可利用该键。下面来看一个类似于清单
17 的源文件。
清单 17. 源文件
1,London 2,Paris, 3,New York |
假设有一个类似于清单 18 的变更文件。
清单 18. 变更文件
1,DELETE 2,UPDATE,Munich 4,INSERT,Tokyo |
最终得出两个文件经过解析的合并结果,如清单 19 所示。
清单 19. 源文件和变更文件的合并
2,Munich 3,New York 4,Tokyo |
我们如何通过 Hadoop 实现这样一种合并?
使用 Hadoop 实现此合并的一种方式是,处理当前数据并将它转换为插入数据(因为它们都是插入目标文件中的新数据),然后将
UDPATE 操作转换为新数据的 DELETE 和 INSERT 操作。事实上,使用变更文件,通过将它修改为清单
20 中的内容更容易实现此目的。
清单 20. 通过 Hadoop 实现合并
1,DELETE 2,DELETE 2,INSERT,Munich 4,INSERT,Tokyo |
问题在于,我们无法对两个文件进行物理合并,但我们可相应地处理它们。如果它是一个原始的 INSERT 或
DELETE,我们会输出一个带有计数器的键。如果它是创建新插入数据的 UPDATE 操作,我们想要一个不会化简的不同的键,所以我们生成一个类似清单
21 的间隙 (interstitial) 文件。
清单 21. 生成间隙文件
1,1,London 2,1,Paris, 3,1,New York 1,-1,London 2,-1,Paris 2#NEW,Munich 4#NEW,1,Tokyo |
在 Reduce 期间,我们汇总每个惟一键的计数器的内容,生成清单 22。
清单 22. 汇总每个惟一键的计数器的内容
1,0,London 2,0,Paris, 3,1,New York 2#NEW,1,Munich 4#NEW,1,Tokyo |
我们然后可通过一个辅助 MapReduce 函数运行内容,使用清单 23 中所示的基本结构。
清单 23. 通过一个辅助 MapReduce 函数运行内容
map: if (key contains #NEW): emit(row) if (count >0 ): emit(row) |
辅助 MapReduce 会得到预期输出,如清单 24 中所示。
清单 24. 辅助 MapReduce 函数的预期输出
3,1,New York 2,Munich 4,1,Tokyo |
图 2 演示了这个首先格式化和化简、然后简化输出的两阶段过程。
图 2. 格式化、化简和简化输出的两阶段过程
原始数据在 Map 和 Reduce 阶段中得到化简和映射
这个过程需要比传统数据库中更多的工作,但它所提供解决方案需要的对不断更新的数据的交换简单得多。
结束语
本文介绍了许多使用 MapReduce 查询的不同场景。您看到了这些查询在处理各种数据上的强大功能,您现在应能够在自己的
MapReduce 解决方案中利用这些示例了。
|