为了满足挖掘分析与交互式实时查询的计算需求,腾讯大数据使用了Spark平台来支持挖掘分析类计算、交互式实时查询计算以及允许误差范围的快速查询计算,目前腾讯大数据拥有超过200台的Spark集群,并独立维护Spark和Shark分支。Spark集群已稳定运行2年,积累了大量的案例和运营经验能力,另外多个业务的大数据查询与分析应用,已在陆续上线并稳定运行。在SQL查询性能方面普遍比MapReduce高出2倍以上,利用内存计算和内存表的特性,性能至少在10倍以上。在迭代计算与挖掘分析方面,精准推荐将小时和天级别的模型训练转变为Spark的分钟级别的训练,同时简洁的编程接口使得算法实现比MR在时间成本和代码量上高出许多。
Spark VS MapReduce
尽管MapReduce适用大多数批处理工作,并且在大数据时代成为企业大数据处理的首选技术,但由于以下几个限制,它对一些场景并不是最优选择:
1.缺少对迭代计算以及DAG运算的支持
2.Shuffle过程多次排序和落地,MR之间的数据需要落Hdfs文件系统
Spark在很多方面都弥补了MapReduce的不足,比MapReduce的通用性更好,迭代运算效率更高,作业延迟更低,它的主要优势包括:
1.提供了一套支持DAG图的分布式并行计算的编程框架,减少多次计算之间中间结果写到Hdfs的开销
2.提供Cache机制来支持需要反复迭代计算或者多次数据共享,减少数据读取的IO开销
3.使用多线程池模型来减少task启动开稍,shuffle过程中避免不必要的sort操作以及减少磁盘IO操作
4.广泛的数据集操作类型
MapReduce由于其设计上的约束只适合处理离线计算,在实时查询和迭代计算上仍有较大的不足,而随着业务的发展,业界对实时查询和迭代分析有更多的需求,单纯依靠MapReduce框架已经不能满足业务的需求了。Spark由于其可伸缩、基于内存计算等特点,且可以直接读写Hadoop上任何格式的数据,成为满足业务需求的最佳候选者。
应用Spark的成功案例
目前大数据在互联网公司主要应用在广告、报表、推荐系统等业务上。在广告业务方面需要大数据做应用分析、效果分析、定向优化等,在推荐系统方面则需要大数据优化相关排名、个性化推荐以及热点点击分析等。这些应用场景的普遍特点是计算量大、效率要求高。Spark恰恰满足了这些要求,该项目一经推出便受到开源社区的广泛关注和好评。并在近两年内发展成为大数据处理领域最炙手可热的开源项目。本章将列举国内外应用Spark的成功案例。
1.腾讯。广点通是最早使用Spark的应用之一。腾讯大数据精准推荐借助Spark快速迭代的优势,围绕“数据+算法+系统”这套技术方案,实现了在“数据实时采集、算法实时训练、系统实时预测”的全流程实时并行高维算法,最终成功应用于广点通pCTR投放系统上,支持每天上百亿的请求量。基于日志数据的快速查询系统业务构建于Spark之上的Shark,利用其快速查询以及内存表等优势,承担了日志数据的即席查询工作。在性能方面,普遍比Hive高2-10倍,如果使用内存表的功能,性能将会比Hive快百倍。
2.Yahoo。Yahoo将Spark用在Audience Expansion中的应用。Audience
Expansion是广告中寻找目标用户的一种方法:首先广告者提供一些观看了广告并且购买产品的样本客户,据此进行学习,寻找更多可能转化的用户,对他们定向广告。Yahoo采用的算法是logistic
regression。同时由于有些SQL负载需要更高的服务质量,又加入了专门跑Shark的大内存集群,用于取代商业BI/OLAP工具,承担报表/仪表盘和交互式/即席查询,同时与桌面BI工具对接。目前在Yahoo部署的Spark集群有112台节点,2TB内存。
3.淘宝。阿里搜索和广告业务,最初使用Mahout或者自己写的MR来解决复杂的机器学习,导致效率低而且代码不易维护。淘宝技术团队使用了Spark来解决多次迭代的机器学习算法、高计算复杂度的算法等。将Spark运用于淘宝的推荐相关算法上,同时还利用Graphx解决了许多生产问题,包括以下计算场景:基于度分布的中枢节点发现、基于最大连通图的社区发现、基于三角形计数的关系衡量、基于随机游走的用户属性传播等。
4.优酷土豆。优酷土豆在使用Hadoop集群的突出问题主要包括:第一是商业智能BI方面,分析师提交任务之后需要等待很久才得到结果;第二就是大数据量计算,比如进行一些模拟广告投放之时,计算量非常大的同时对效率要求也比较高,最后就是机器学习和图计算的迭代运算也是需要耗费大量资源且速度很慢。最终发现这些应用场景并不适合在MapReduce里面去处理。通过对比,发现Spark性能比MapReduce提升很多。首先,交互查询响应快,性能比Hadoop提高若干倍;模拟广告投放计算效率高、延迟小(同hadoop比延迟至少降低一个数量级);机器学习、图计算等迭代计算,大大减少了网络传输、数据落地等,极大的提高的计算性能。目前Spark已经广泛使用在优酷土豆的视频推荐(图计算)、广告业务等。
腾讯大数据Spark的概况
腾讯大数据综合了多个业务线的各种需求和特性,目前正在进行以下工作:
1.经过改造和优化的Shark和Spark吸收了TDW平台的功能,如Hive的特有功能:元数据重构,分区优化等,同时可以通过IDE或者洛子调度来直接执行HiveSql查询和定时调度Spark的任务;
2.与Gaia和TDW的底层存储直接兼容,可以直接安全且高效地使用TDW集群上的数据;
3.对Spark底层的使用门槛,资源管理与调度,任务监控以及容灾等多个功能进行完善,并支持快速的迁移和扩容。
Spark在相似度计算方面的应用
相似度是指两个节点之间特定属性的相似程度,相似度计算是数据挖掘、推荐引擎中的最基本问题。例如在推荐系统中通过计算推荐物品的相似度,从而给目标用户推荐与他喜欢的物品相似度较高的物品,或是计算用户之间的相似度,给目标用户推荐与其相似的用户喜欢的物品。因此,相似度计算技术在很大程度上决定着推荐系统的性能。
随着大数据时代的来临,日益增加的数据量使得单机的计算能力已经远远无法满足需求。在对大规模的节点对进行相似度计算时,分布式处理往往是可行的解决方案。MapReduce是目前流行的分布式编程框架。Hadoop与Spark是MapReduce编程模型的两个开源实现。相比于Hadoop,Spark提供了cache机制,增加了对迭代计算的支持;还提供了DAG调度来支持复杂的计算任务,减少了中间结果的磁盘读写,能够获得更佳的性能。
问题描述
输入数据可以表示成两张表:
1.节点关系表relation,字段有id, fid,表示两个节点存在关系。
2.节点特征表features,字段有id, feature,表示每个节点具有的特征信息。
下列两个表格表示了在一个拥有6个节点的关系网络中,节点关系表和节点特征表的情况。
相似度计算即是对节点关系表中的所有节点对 (id,fid),其特征向量分别为 和,利用相似度计算函数similarity-Calculation,计算和之间的相似度。相似度计算函数similarity-Calculation依据具体的相似度衡量方法而定。
MapReduce 解决方案
Hive是建立在Hadoop之上提供SQL接口处理的海量数据处理工具,对于上述相似度计算问题,其计算流程可以用如下SQL来描述,并使用Hive来计算。
整个计算流程可以分为两个步骤:
通过两次JOIN操作,生成一张临时表,临时表中的一个元组对应节点关系表中的一对节点和这两个节点的特征向量。
遍历临时表,对每个元组中的两个节点计算其相似度。
下图展示了该SQL语句的执行过程:
使用Hive对千亿节点关系记录进行相似度计算,两次JOIN操作成为性能的主要瓶颈瓶颈。在两次JOIN的过程中,网络数据传输和磁盘读写达到了200TB,集群多数结点的硬盘无法支持,任务失败经常发生,作业运行了时间超过了24小时。通过将节点关系表拆分成多个子表,每个子表独立地进行相似度计算,多个子表的任务并行执行,最后再将多个子作业的结果汇总,得到最终结果。采用这样的方式,作业总时间仍然超过了24小时。
Spark解决方案
通过对Hive计算过程的分析,我们发现网络数据开销主要来自于节点特征向量的大量复制。对于节点关系表中的每对关系,计算时都需要得到两个节点的特征向量,从而导致了大量的数据复制。因此,我们从两个方面去减少数据复制:
1.采用二维图划分的思想,减少节点的复制数目
2.每个数据分区中,对于同一个节点,只保留一份该节点特征向量
二维图划分方法:
任何一张关系网络,都可以用一个大矩阵M来表示,矩阵的两个维度用来表示节点,矩阵的元素M[i, j]表示节点i和节点j是否存在关联,如果存在,则M[i,j]值为1,否则,M[i,
j]值为0。下图展示了通过采用二维划分的方法,将一个矩阵划分成了16个分区。
使用二维划分可以减少节点的复制数目。假设分区总数为,采用一维划分的方法,最差情况下每个节点的复制份数是,即每个分区都会有该节点的复制;采用二维划分方法,最差情况下每个节点的复制份是
。对于大数据量,分区总数通常很大,所以采用二维划分通常可以减少每个节点的复制份数。
计算步骤:
利用二维划分方法将节点关系表划分成多个数据分区,假设我们将分区数设为4,则Table
1所示的节点关系表将会划分到4个分区,每个元组对应的分区如下Table 3所示:
根据每个分区中的节点列表,计算出每个节点所在的分区列表,称为路由表,记录了每个节点所在的分区信息,其结果如Table
4所示。
根据路由表将每个节点的特征向量发送至每个分区之中,保证每个分区中一个节点只保存一份特征向量,如Table
5所示。
对于每个分区,将该分区的关系集合与该分区中所有结点的特征向量进行关联,遍历每对节点关系,利用相似度函数和特征向量计算二者的相似度。
通过以上步骤,即可以计算出节点关系表中每对节点的相似度。与MapReduce的计算方法相比,如果一个用户多次出现在同一个分区中,比如用户1在分区1中出现了两次,上述计算步骤只会将用户1的特征向量发送一份到分区1中,但是MapReduce的计算方法会发送两次,产生冗余的网络数据传输。使用上述计算方法,我们将网络传输量降到了50
T,远小于MapReduce方法的网络传输量。
系统层次优化:
除了在计算流程上进行改进,我们还对Spark进行了以下方面的优化:
1.优化分区参数设置。在相似度计算的应用中,分区个数越多,会导致节点的复制份数增加,从而增大网络数据传输量。因此我们基于中间结果的统计信息来确定确定分区个数,使得在充分利用每个节点内存和CPU的前提下,最小化分区个数。
2.优化内存表示。由于数据量大,对象个数多,导致内存使用量较高,GC时间较长。我们使用列存储格式来对内存数据进行压缩,减少数据量的同时也减少了对象个数。
3.提高网络稳定性。随着集群中机器数目的增加,网络连接数也会成倍增加。当网络出现拥挤时,经常会伴随着连接超时从而导致shuffle数据拉取失败。更糟糕的情况是,网络超时会让Master误认为Executor已经丢失,故会使得整个Executor上已经完成的任务全部重做。因此在shuffle时增加网络超时重试机制,同时控制每次发送的请求连接数,避免shuffle拉数据超时,减少任务失败次数,防止Executor丢失的情况出现。
4.使用sort-based shuffle时将文件块索引信息缓存一份在内存中,后续拉数据时直接读内存获取索引信息。预测执行时,当同一任务的一批运行实例有一个完成时,杀掉正在运行的其余实例,提早释放计算资源。
5.参数调整。由于每个Executor进程还会使用到堆外内存,因此Executor进程占用的内存往往会大于JVM设定的最大值,为了保证Gaia不会将超过JVM内存的Executor进程杀掉,配置参数yarn.executor.memoryOverhead以免被kill。由于Executor在Full
GC时需要较长时间,需要配置参数spark.storage.blockManagerSlaveTimeoutMs来延长blockManager的超时时间。
实验对比
实验环境:分别在拥有200台、600台和1000台TS5机器节点的集群上进行了对比,每台机器拥有64GB内存,2*12T硬盘,24线程CPU。在两个数据集上进行了Hadoop、社区GraphX和TDW-Spark的性能对比,一个数据集拥有五百亿节点对,而另一个拥有千亿量级的节点对。实验结果如下表所示:
通过上述实验对比,可以看出在MapReduce上的实现的性能远远低于在Spark上的性能,使用JOIN的方法使得网络通信开销非常大,五百亿数据集的任务执行时间超过12个小时,千亿数据集任务执行时间超过24个小时;GraphX采用的同样是二维图划分,但是由于其是一个面向通用的图计算框架,维护了复杂的数据结构和计算流程,造成性能下降。同时,GraphX在网络稳定性方面存在许多问题,当集群规模达到600台时便会有大量的任务失败。
与前两者相比,TDW-Spark在集群为200台时在两个数据集上都获得了较大的性能增长,所消耗时间少于GraphX的一半。当集群规模从200台扩充至600台,TDW-Spark在五百亿节点对数据集上获得加速比218%,在千亿节点上的加速比为280%;当集群规模从200台扩充至1000台时,加速比分别为279%和350%。因此,TDW-Spark不仅在性能上获得了很大的提升,还可以在千台规模的集群之上稳定运行,同时获得良好的水平扩展能力。
Spark在基于物品的协同过滤推荐算法的应用
互联网的发展导致了信息爆炸。面对海量的信息,如何对信息进行刷选和过滤,将用户最关注最感兴趣的信息展现在用户面前,已经成为了一个亟待解决的问题。推荐系统可以通过用户与信息之间的联系,一方面帮助用户获取有用的信息,另一方面又能让信息展现在对其感兴趣的用户面前,实现了信息提供商与用户的双赢。
协同过滤推荐(Collaborative Filtering Recommendation)算法是最经典最常用的推荐算法,算法通过分析用户兴趣,在用户群中找到指定用户的相似用户,综合这些相似用户对某一信息的评价,形成系统对该指定用户对此信息的喜好程度预测。协同过滤可细分为以下三种:
1.User-based CF: 基于User的协同过滤,通过不同用户对Item的评分来评测用户之间的相似性,根据用户之间的相似性做出推荐;
2.Item-based CF: 基于Item的协同过滤,通过用户对不同Item的评分来评测Item之间的相似性,根据Item之间的相似性做出推荐;
3.Model-based CF: 以模型为基础的协同过滤(Model-based
Collaborative Filtering)是先用历史资料得到一个模型,再用此模型进行预测推荐。
问题描述
输入数据格式:Uid,ItemId,Rating (用户Uid对ItemId的评分)。
输出数据:每个ItemId相似性最高的前N个ItemId。
由于篇幅限制,这里我们只选择基于Item的协同过滤算法解决这个例子。
算法逻辑:基于Item的协同过滤算法的基本假设为两个相似的Item获得同一个用户的好评的可能性较高。因此,该算法首先计算用户对物品的喜好程度,然后根据用户的喜好计算Item之间的相似度,最后找出与每个Item最相似的前N个Item。该算法的详细描述如下:
1.计算用户喜好:不同用户对Item的评分数值可能相差较大,因此需要先对每个用户的评分做二元化处理,例如对于某一用户对某一Item的评分大于其给出的平均评分则标记为好评1,否则为差评0。
2.计算Item相似性:采用Jaccard系数作为计算两个Item的相似性方法。狭义Jaccard相似度适合计算两个集合之间的相似程度,计算方法为两个集合的交集除以其并集,具体的分为以下三步。
1、Item好评数统计,统计每个Item的好评用户数。
2、Item好评键值对统计,统计任意两个有关联Item的相同好评用户数。
3、Item相似性计算,计算任意两个有关联Item的相似度。
找出最相似的前N个Item。这一步中,Item的相似度还需要归一化后整合,然后求出每个Item最相似的前N个Item,具体的分为以下三步。
1.Item相似性归一化。
2.Item相似性评分整合。
3.获取每个Item相似性最高的前N个Item。
基于MapReduce的实现方案
使用MapReduce编程模型需要为每一步实现一个MapReduce作业,一共存在包含七个MapRduce作业。每个MapReduce作业都包含Map和Reduce,其中Map从HDFS读取数,输出数据通过Shuffle把键值对发送到Reduce,Reduce阶段以<key,Iterator<value>>作为输入,输出经过处理的键值对到HDFS。其运行原理如图所示:
七个MapReduce作业意味着需要七次读取和写入HDFS,而它们的输入输出数据存在关联,七个作业输入输出数据关系如图所示。
基于MapReduce实现此算法存在以下问题:
1.为了实现一个业务逻辑需要使用七个MapReduce作业,七个作业间的数据交换通过HDFS完成,增加了网络和磁盘的开销。
2.七个作业都需要分别调度到集群中运行,增加了Gaia集群的资源调度开销。
3.MR2和MR3重复读取相同的数据,造成冗余的HDFS读写开销。
这些问题导致作业运行时间大大增长,作业成本增加。
基于Spark的实现方案
相比与MapReduce编程模型,Spark提供了更加灵活的DAG(Directed Acyclic
Graph) 编程模型,不仅包含传统的map、reduce接口,还增加了filter、flatMap、union等操作接口,使得编写Spark程序更加灵活方便。使用Spark编程接口实现上述的业务逻辑如图所示。
相对于MapReduce,Spark在以下方面优化了作业的执行时间和资源使用。
1.DAG编程模型。通过Spark的DAG编程模型可以把七个MapReduce简化为一个Spark作业。Spark会把该作业自动切分为八个Stage,每个Stage包含多个可并行执行的Tasks。Stage之间的数据通过Shuffle传递。最终只需要读取和写入HDFS一次。减少了六次HDFS的读写,读写HDFS减少了70%。
2.Spark作业启动后会申请所需的Executor资源,所有Stage的Tasks以线程的方式运行,共用Executors,相对于MapReduce方式,Spark申请资源的次数减少了近90%。
3.Spark引入了RDD(Resilient Distributed
Dataset)模型,中间数据都以RDD的形式存储,而RDD分布存储于slave节点的内存中,这就减少了计算过程中读写磁盘的次数。RDD还提供了Cache机制,例如对上图的rdd3进行Cache后,rdd4和rdd7都可以访问rdd3的数据。相对于MapReduce减少MR2和MR3重复读取相同数据的问题。
效果对比
测试使用相同规模的资源,其中MapReduce方式包含200个Map和100个Reduce,每个Map和Reduce配置4G的内存;
由于Spark不再需要Reduce资源,而MapReduce主要逻辑和资源消耗在Map端,因此使用200和400个Executor做测试,每个Executor包含4G内存。测试结果如下表所示,其中输入记录约38亿条。
对比结果表的第一行和第二行,Spark运行效率和成本相对于MapReduce方式减少非常明显,其中,DAG模型减少了70%的HDFS读写、cache减少重复数据的读取,这两个优化即能减少作业运行时间又能降低成本;而资源调度次数的减少能提高作业的运行效率。对比结果表的第二行和第三行,增加一倍的Executor数目,作业运行时间减少约50%,成本增加约25%,从这个结果看到,增加Executor资源能有效的减少作业的运行时间,但并没有做到完全线性增加。这是因为每个Task的运行时间并不是完全相等的,
例如某些task处理的数据量比其他task多;这可能导致Stage的最后时刻某些Task未结束而无法启动下一个Stage,另一方面作业是一直占有Executor的,这时候会出现一些Executor空闲的状况,于是导致了成本的增加。
|