您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Model Center   Code  
会员   
   
 
     
   
 订阅
  捐助
Python学习笔记——大数据之SPARK核心
 
   次浏览      
 2019-9-16 
 
编辑推荐:

本文来自于csdn,文章讲解RDD的特点,RDD操作函数相关,穿插案例辣酱得段子,带大家理解MapReduce,通过哈姆雷特单词分析案例进行深度剖析。

RDD(Resilient Distributed Datasets弹性分布式数据集),是spark中最重要的概念,可以简单的把RDD理解成一个提供了许多操作接口的数据集合,和一般数据集不同的是,其实际数据分布存储于一批机器中(内存或磁盘中),RDD混合了各种计算模型,使得Spark可以应用于各种大数据处理场景当然,RDD肯定不会这么简单,它的功能还包括容错、集合内的数据可以并行处理等。RDD可以cache到内存中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapReduce大量的磁盘IO操作。

RDD的特点

创建:只能通过转换 ( transformation ,如map/filter/groupBy/join 等,区别于动作 action) 从两种数据源中创建 RDD

只读:状态不可变,不能修改。

分区:支持使 RDD 中的元素根据那个 key 来分区 ( partitioning ) ,保存到多个结点上。还原时只会重新计算丢失分区的数据,而不会影响整个系统。

路径:即 RDD 有充足的信息关于它是如何从其他 RDD 产生而来的。

持久化:支持将会被重用的 RDD 缓存 ( 如 in-memory 或溢出到磁盘 )。

延迟计算: Spark 也会延迟计算 RDD ,使其能够将转换管道化 (pipeline transformation)。

操作:丰富的转换(transformation)和动作 ( action ) , count/reduce/collect/save 等。执行了多少次transformation操作,RDD都不会真正执行运算(记录lineage),只有当action操作被执行时,运算才会触发。

RDD 分为二类:transformation 和 action。

transformation 是从一个 RDD 转换为一个新的 RDD 或者从数据源生成一个新的 RDD;

action 是触发 job 的执行,只有在 action 被提交的时候才触发前面整个RDD的执行图

RDD运行逻辑

2.1一个段子理解MapReduce?

李耳我有三件宝贝,持有而珍重它。第一件叫慈爱,第二件叫节俭,第三件叫不敢处在众人之先

江湖传说永流传:谷歌技术有"三宝",GFS、MapReduce和大表(BigTable)!

辣酱段子

昨天,我在Xebia印度办公室发表了一个关于MapReduce的演说。演说进行得很顺利,听众们都能够理解MapReduce的概念(根据他们的反馈)。我成功地向技术听众们(解释了MapReduce的概念,这让我感到兴奋。在所有辛勤的工作之后,我们在Xebia印度办公室享用了丰盛的晚餐,然后我径直回了家。

 回家后,我的妻子(Supriya)问道:“你的会开得怎么样?”我说还不错。 接着她又问我会议是的内容是什么(她不是从事软件或编程领域的工作的)。我告诉她说MapReduce。“Mapduce,那是什么玩意儿?”她问道: “跟地形图有关吗?”我说不,不是的,它和地形图一点关系也没有。“那么,它到底是什么玩意儿?”妻子问道。 “唔…让我们去Dominos(披萨连锁)吧,我会在餐桌上跟你好好解释。” 妻子说:“好的。” 然后我们就去了披萨店。

我们在Domions点餐之后,柜台的小伙子告诉我们说披萨需要15分钟才能准备好。于是,我问妻子:“你真的想要弄懂什么是MapReduce?” 她很坚定的回答说“是的”。 因此我问道:

  我: 你是如何准备洋葱辣椒酱的?(以下并非准确食谱,请勿在家尝试)

  妻子: 我会取一个洋葱,把它切碎,然后拌入盐和水,最后放进混合研磨机里研磨。这样就能得到洋葱辣椒酱了。

  妻子: 但这和MapReduce有什么关系?

  我: 你等一下。让我来编一个完整的情节,这样你肯定可以在15分钟内弄懂MapReduce.

  妻子: 好吧。

  我:现在,假设你想用薄荷、洋葱、番茄、辣椒、大蒜弄一瓶混合辣椒酱。你会怎么做呢?

  妻子: 我会取薄荷叶一撮,洋葱一个,番茄一个,辣椒一根,大蒜一根,切碎后加入适量的盐和水,再放入混合研磨机里研磨,这样你就可以得到一瓶混合辣椒酱了。

  我: 没错,让我们把MapReduce的概念应用到食谱上。Map和Reduce其实是两种操作,我来给你详细讲解下。Map(映射): 把洋葱、番茄、辣椒和大蒜切碎,是各自作用在这些物体上的一个Map操作。所以你给Map一个洋葱,Map就会把洋葱切碎。 同样的,你把辣椒,大蒜和番茄一一地拿给Map,你也会得到各种碎块。 所以,当你在切像洋葱这样的蔬菜时,你执行就是一个Map操作。 Map操作适用于每一种蔬菜,它会相应地生产出一种或多种碎块,在我们的例子中生产的是蔬菜块。在Map操作中可能会出现有个洋葱坏掉了的情况,你只要把坏洋葱丢了就行了。所以,如果出现坏洋葱了,Map操作就会过滤掉坏洋葱而不会生产出任何的坏洋葱块。

  Reduce(化简):在这一阶段,你将各种蔬菜碎都放入研磨机里进行研磨,你就可以得到一瓶辣椒酱了。这意味要制成一瓶辣椒酱,你得研磨所有的原料。因此,研磨机通常将map操作的蔬菜碎聚集在了一起。

  妻子: 所以,这就是MapReduce?

  我: 你可以说是,也可以说不是。 其实这只是MapReduce的一部分,MapReduce的强大在于分布式计算。

  妻子: 分布式计算? 那是什么?请给我解释下吧。

  我: 没问题。

  我: 假设你参加了一个辣椒酱比赛并且你的食谱赢得了最佳辣椒酱奖。得奖之后,辣椒酱食谱大受欢迎,于是你想要开始出售自制品牌的辣椒酱。假设你每天需要生产10000瓶辣椒酱,你会怎么办呢?

  妻子: 我会找一个能为我大量提供原料的供应商。

  我:是的..就是那样的。那你能否独自完成制作呢?也就是说,独自将原料都切碎? 仅仅一部研磨机又是否能满足需要?而且现在,我们还需要供应不同种类的辣椒酱,像洋葱辣椒酱、青椒辣椒酱、番茄辣椒酱等等。

  妻子: 当然不能了,我会雇佣更多的工人来切蔬菜。我还需要更多的研磨机,这样我就可以更快地生产辣椒酱了。

  我:没错,所以现在你就不得不分配工作了,你将需要几个人一起切蔬菜。每个人都要处理满满一袋的蔬菜,而每一个人都相当于在执行一个简单的Map操作。每一个人都将不断的从袋子里拿出蔬菜来,并且每次只对一种蔬菜进行处理,也就是将它们切碎,直到袋子空了为止。

  这样,当所有的工人都切完以后,工作台(每个人工作的地方)上就有了洋葱块、番茄块、和蒜蓉等等。   妻子:但是我怎么会制造出不同种类的番茄酱呢?

  我:现在你会看到MapReduce遗漏的阶段—搅拌阶段。MapReduce将所有输出的蔬菜碎都搅拌在了一起,这些蔬菜碎都是在以key为基础的map操作下产生的。搅拌将自动完成,你可以假设key是一种原料的名字,就像洋葱一样。 所以全部的洋葱keys都会搅拌在一起,并转移到研磨洋葱的研磨器里。这样,你就能得到洋葱辣椒酱了。同样地,所有的番茄也会被转移到标记着番茄的研磨器里,并制造出番茄辣椒酱。

  披萨终于做好了,她点点头说她已经弄懂什么是MapReduce了。我只希望下次她听到MapReduce时,能更好的理解我到底在做些什么。

网上其他人用最简短的语言解释MapReduce:

We want to count all the books in the library. You count up shelf #1, I count up shelf #2. That’s map. The more people we get, the faster it goes.

  我们要数图书馆中的所有书。你数1号书架,我数2号书架。这就是“Map”。我们人越多,数书就更快。

  Now we get together and add our individual counts. That’s reduce.

  现在我们到一起,把所有人的统计数加在一起。这就是“Reduce”

2.2RDD操作函数

transformation

action

 

函数具体功能介绍

<1>将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素, 输入分区与输出分区一对一,即:有多少个输入分区,就有多少个输出分区.

data = sc.textFile("/tmp/hive/tmp/1.txt")
data: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at :21
//使用map算子
mapresult = data.map()
mapresult: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at :23

 

<2>flatMap,属于Transformation算子,第一步和map一样,最后将所有的输出分区合并成一个

<3>count返回RDD中的元素数量

rdd.count()

 

 

<4>reduce,根据映射函数对RDD里的元素进行二元计算结果,返回计算结果

rdd.reduce(lambda x,y:x.a+y.a,x.b+y.b)

 

 

<5>collect用于将一个RDD转换成数组

rdd.collect

<6>countByKey(),countByKey用于统计RDD[K,V]中每个K的数量

rdd.countBykey

<7>foreach(),foreach用于遍历RDD,将函数f应用于每一个元素?<8>saveAsTextFile用于将RDD以文本文件的格式存储到文件系统中

rdd.saveAsTextFile("/tmp/hive/itcast/python-bigdata.txt")

2.3哈姆雷特单词分析案例

hdfs文件操作

<1>将本地的Hamlet.txt上传到hdfs上

hadoop fs -put Hamlet.txt /tmp/hive/itcast/

其他操作

获取hdfs文件到本地

hadoop fs -get /tmp/hive/itcast/python.txt ./

列出hdfs文件系统根目录下的目录和文件

hadoop fs -ls /

rm操作

hadoop fs -rm < hdfs file > ...
hadoop fs -rm -r < hdfs dir>...
每次可以删除多个文件或目录

spark运行原理程序分析

Spark应用作为独立的进程运行,由驱动程序中的SparkContext协调。这个context将会连接到一些集群管理者(如YARN),这些管理者分配系统资源。集群上的每个worker由执行者(executor)管理,执行者反过来由SparkContext管理。执行者管理计算、存储,还有每台机器上的缓存。

sc

开启spark后,SparkContext简略写法sc,对于sc可以进行文件的读取以及节点之间的协调

<1>读取文件并转换成一个RDD数据集

sc.textFile("itcast-python.txt")

<2>将数据从一个节点发送到其它节点上

#读取RDD类型的数据集,并且返回一个broadcast类型的数据

#读取RDD类型的数据集,并且返回一个broadcast类型的数据
bdata = sc.broadcast(data)

 

from operator import add
text = sc.textFile("/tmp/hive/itcast/Hamlet.txt")
def tokenize(text):
return text.split()

words = text.flatMap(tokenize)
wc = words.map(lambda x: (x,1))
counts = wc.reduceByKey(add)
counts.saveAsTextFile("/tmp/hive/itcast/hm")

 

 

 

   
次浏览       
相关文章

基于EA的数据库建模
数据流建模(EA指南)
“数据湖”:概念、特征、架构与案例
在线商城数据库系统设计 思路+效果
 
相关文档

Greenplum数据库基础培训
MySQL5.1性能优化方案
某电商数据中台架构实践
MySQL高扩展架构设计
相关课程

数据治理、数据架构及数据标准
MongoDB实战课程
并发、大容量、高性能数据库设计与优化
PostgreSQL数据库实战培训