编辑推荐: |
本文来自于csdn,文章讲解RDD的特点,RDD操作函数相关,穿插案例辣酱得段子,带大家理解MapReduce,通过哈姆雷特单词分析案例进行深度剖析。
|
|
RDD(Resilient Distributed Datasets弹性分布式数据集),是spark中最重要的概念,可以简单的把RDD理解成一个提供了许多操作接口的数据集合,和一般数据集不同的是,其实际数据分布存储于一批机器中(内存或磁盘中),RDD混合了各种计算模型,使得Spark可以应用于各种大数据处理场景当然,RDD肯定不会这么简单,它的功能还包括容错、集合内的数据可以并行处理等。RDD可以cache到内存中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapReduce大量的磁盘IO操作。
RDD的特点
创建:只能通过转换 ( transformation ,如map/filter/groupBy/join
等,区别于动作 action) 从两种数据源中创建 RDD
只读:状态不可变,不能修改。
分区:支持使 RDD 中的元素根据那个 key 来分区 ( partitioning ) ,保存到多个结点上。还原时只会重新计算丢失分区的数据,而不会影响整个系统。
路径:即 RDD 有充足的信息关于它是如何从其他 RDD 产生而来的。
持久化:支持将会被重用的 RDD 缓存 ( 如 in-memory 或溢出到磁盘 )。
延迟计算: Spark 也会延迟计算 RDD ,使其能够将转换管道化 (pipeline transformation)。
操作:丰富的转换(transformation)和动作 ( action ) , count/reduce/collect/save
等。执行了多少次transformation操作,RDD都不会真正执行运算(记录lineage),只有当action操作被执行时,运算才会触发。
RDD 分为二类:transformation 和 action。
transformation 是从一个 RDD 转换为一个新的 RDD 或者从数据源生成一个新的 RDD;
action 是触发 job 的执行,只有在 action 被提交的时候才触发前面整个RDD的执行图
RDD运行逻辑
2.1一个段子理解MapReduce?
李耳我有三件宝贝,持有而珍重它。第一件叫慈爱,第二件叫节俭,第三件叫不敢处在众人之先
江湖传说永流传:谷歌技术有"三宝",GFS、MapReduce和大表(BigTable)!
辣酱段子
昨天,我在Xebia印度办公室发表了一个关于MapReduce的演说。演说进行得很顺利,听众们都能够理解MapReduce的概念(根据他们的反馈)。我成功地向技术听众们(解释了MapReduce的概念,这让我感到兴奋。在所有辛勤的工作之后,我们在Xebia印度办公室享用了丰盛的晚餐,然后我径直回了家。
回家后,我的妻子(Supriya)问道:“你的会开得怎么样?”我说还不错。 接着她又问我会议是的内容是什么(她不是从事软件或编程领域的工作的)。我告诉她说MapReduce。“Mapduce,那是什么玩意儿?”她问道:
“跟地形图有关吗?”我说不,不是的,它和地形图一点关系也没有。“那么,它到底是什么玩意儿?”妻子问道。
“唔…让我们去Dominos(披萨连锁)吧,我会在餐桌上跟你好好解释。” 妻子说:“好的。” 然后我们就去了披萨店。
我们在Domions点餐之后,柜台的小伙子告诉我们说披萨需要15分钟才能准备好。于是,我问妻子:“你真的想要弄懂什么是MapReduce?”
她很坚定的回答说“是的”。 因此我问道:
我: 你是如何准备洋葱辣椒酱的?(以下并非准确食谱,请勿在家尝试)
妻子: 我会取一个洋葱,把它切碎,然后拌入盐和水,最后放进混合研磨机里研磨。这样就能得到洋葱辣椒酱了。
妻子: 但这和MapReduce有什么关系?
我: 你等一下。让我来编一个完整的情节,这样你肯定可以在15分钟内弄懂MapReduce.
妻子: 好吧。
我:现在,假设你想用薄荷、洋葱、番茄、辣椒、大蒜弄一瓶混合辣椒酱。你会怎么做呢?
妻子: 我会取薄荷叶一撮,洋葱一个,番茄一个,辣椒一根,大蒜一根,切碎后加入适量的盐和水,再放入混合研磨机里研磨,这样你就可以得到一瓶混合辣椒酱了。
我: 没错,让我们把MapReduce的概念应用到食谱上。Map和Reduce其实是两种操作,我来给你详细讲解下。Map(映射):
把洋葱、番茄、辣椒和大蒜切碎,是各自作用在这些物体上的一个Map操作。所以你给Map一个洋葱,Map就会把洋葱切碎。
同样的,你把辣椒,大蒜和番茄一一地拿给Map,你也会得到各种碎块。 所以,当你在切像洋葱这样的蔬菜时,你执行就是一个Map操作。
Map操作适用于每一种蔬菜,它会相应地生产出一种或多种碎块,在我们的例子中生产的是蔬菜块。在Map操作中可能会出现有个洋葱坏掉了的情况,你只要把坏洋葱丢了就行了。所以,如果出现坏洋葱了,Map操作就会过滤掉坏洋葱而不会生产出任何的坏洋葱块。
Reduce(化简):在这一阶段,你将各种蔬菜碎都放入研磨机里进行研磨,你就可以得到一瓶辣椒酱了。这意味要制成一瓶辣椒酱,你得研磨所有的原料。因此,研磨机通常将map操作的蔬菜碎聚集在了一起。
妻子: 所以,这就是MapReduce?
我: 你可以说是,也可以说不是。 其实这只是MapReduce的一部分,MapReduce的强大在于分布式计算。
妻子: 分布式计算? 那是什么?请给我解释下吧。
我: 没问题。
我: 假设你参加了一个辣椒酱比赛并且你的食谱赢得了最佳辣椒酱奖。得奖之后,辣椒酱食谱大受欢迎,于是你想要开始出售自制品牌的辣椒酱。假设你每天需要生产10000瓶辣椒酱,你会怎么办呢?
妻子: 我会找一个能为我大量提供原料的供应商。
我:是的..就是那样的。那你能否独自完成制作呢?也就是说,独自将原料都切碎? 仅仅一部研磨机又是否能满足需要?而且现在,我们还需要供应不同种类的辣椒酱,像洋葱辣椒酱、青椒辣椒酱、番茄辣椒酱等等。
妻子: 当然不能了,我会雇佣更多的工人来切蔬菜。我还需要更多的研磨机,这样我就可以更快地生产辣椒酱了。
我:没错,所以现在你就不得不分配工作了,你将需要几个人一起切蔬菜。每个人都要处理满满一袋的蔬菜,而每一个人都相当于在执行一个简单的Map操作。每一个人都将不断的从袋子里拿出蔬菜来,并且每次只对一种蔬菜进行处理,也就是将它们切碎,直到袋子空了为止。
这样,当所有的工人都切完以后,工作台(每个人工作的地方)上就有了洋葱块、番茄块、和蒜蓉等等。 妻子:但是我怎么会制造出不同种类的番茄酱呢?
我:现在你会看到MapReduce遗漏的阶段—搅拌阶段。MapReduce将所有输出的蔬菜碎都搅拌在了一起,这些蔬菜碎都是在以key为基础的map操作下产生的。搅拌将自动完成,你可以假设key是一种原料的名字,就像洋葱一样。
所以全部的洋葱keys都会搅拌在一起,并转移到研磨洋葱的研磨器里。这样,你就能得到洋葱辣椒酱了。同样地,所有的番茄也会被转移到标记着番茄的研磨器里,并制造出番茄辣椒酱。
披萨终于做好了,她点点头说她已经弄懂什么是MapReduce了。我只希望下次她听到MapReduce时,能更好的理解我到底在做些什么。
网上其他人用最简短的语言解释MapReduce:
We want to count all the books in the library. You
count up shelf #1, I count up shelf #2. That’s map.
The more people we get, the faster it goes.
我们要数图书馆中的所有书。你数1号书架,我数2号书架。这就是“Map”。我们人越多,数书就更快。
Now we get together and add our individual counts.
That’s reduce.
现在我们到一起,把所有人的统计数加在一起。这就是“Reduce”
2.2RDD操作函数
transformation
action
函数具体功能介绍
<1>将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素,
输入分区与输出分区一对一,即:有多少个输入分区,就有多少个输出分区.
data = sc.textFile("/tmp/hive/tmp/1.txt")
data: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1]
at textFile at :21
//使用map算子
mapresult = data.map()
mapresult: org.apache.spark.rdd.RDD[Array[String]]
= MapPartitionsRDD[2] at map at :23
|
<2>flatMap,属于Transformation算子,第一步和map一样,最后将所有的输出分区合并成一个
<3>count返回RDD中的元素数量
<4>reduce,根据映射函数对RDD里的元素进行二元计算结果,返回计算结果
rdd.reduce(lambda
x,y:x.a+y.a,x.b+y.b) |
<5>collect用于将一个RDD转换成数组
<6>countByKey(),countByKey用于统计RDD[K,V]中每个K的数量
<7>foreach(),foreach用于遍历RDD,将函数f应用于每一个元素?<8>saveAsTextFile用于将RDD以文本文件的格式存储到文件系统中
rdd.saveAsTextFile("/tmp/hive/itcast/python-bigdata.txt") |
2.3哈姆雷特单词分析案例
hdfs文件操作
<1>将本地的Hamlet.txt上传到hdfs上
hadoop fs -put
Hamlet.txt /tmp/hive/itcast/ |
其他操作
获取hdfs文件到本地
hadoop fs -get
/tmp/hive/itcast/python.txt ./ |
列出hdfs文件系统根目录下的目录和文件
rm操作
hadoop fs -rm
< hdfs file > ...
hadoop fs -rm -r < hdfs dir>...
每次可以删除多个文件或目录 |
spark运行原理程序分析
Spark应用作为独立的进程运行,由驱动程序中的SparkContext协调。这个context将会连接到一些集群管理者(如YARN),这些管理者分配系统资源。集群上的每个worker由执行者(executor)管理,执行者反过来由SparkContext管理。执行者管理计算、存储,还有每台机器上的缓存。
sc
开启spark后,SparkContext简略写法sc,对于sc可以进行文件的读取以及节点之间的协调
<1>读取文件并转换成一个RDD数据集
sc.textFile("itcast-python.txt") |
<2>将数据从一个节点发送到其它节点上
#读取RDD类型的数据集,并且返回一个broadcast类型的数据
#读取RDD类型的数据集,并且返回一个broadcast类型的数据
bdata = sc.broadcast(data) |
from operator
import add
text = sc.textFile("/tmp/hive/itcast/Hamlet.txt")
def tokenize(text):
return text.split()
words = text.flatMap(tokenize)
wc = words.map(lambda x: (x,1))
counts = wc.reduceByKey(add)
counts.saveAsTextFile("/tmp/hive/itcast/hm")
|
|