求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Modeler   Code  
会员   
 
  
 
 
     
   
分享到
Beatles小记
 

作者:cenwenchu79 ,发布于2012-8-14,来源:博客

 

目录:

Beatles小记-分布式数据流分析框架(一)

Beatles小记-分布式数据流分析框架(二)

Beatles小记(三)-分布式数据流分析中Master的横向扩展

Beatles小记-分布式数据流分析框架(一)

概述

这篇小记主要处于两方面考虑:首先,希望打破一提到海量数据分析,就只有hadoop基础上的一系列工具,更多的时候很多企业需要的是更轻量的设计(办喜酒杀猪杀鸡未必都要用一把刀),因此将开放平台基础分析组件重构版本beatles的设计写出来,给出更多的思考空间。其次,也是希望推广一种思想,所有的系统,框架设计简化(可扩展),小部件精致化,这样才能让很多项目能够整体灵活,细节给力。

建议

这篇小记一共分成4部分,概述,整体设计,局部设计,待续。如果你只想了解个背景,那么看完概述即可,如果对于流式分析的大框架设计感兴趣(看看省略了分布式计算集群的什么?核心设计是怎么样的),请仔细看完整体设计。如果还对代码优化有兴趣,那请看局部设计(细到代码功能级别)。最后留下的待续,将会增加后续的一些扩展,及同学看完后提出问题的解答(比较通用的一些问题,例如容灾啊,啥啥啥很多被认为很重要的东西)

背景

07年底开始做开放平台,当时每天访问量在4kw左右,考虑到开放平台的数据透明化需求,开始考虑如何做统计分析,当时需求是一天出一次结果即可,因此自己摸索搭建Hadoop迷你集群,开始了分析之路。09年公司调整加入淘宝开放平台,当时每天服务调用量2亿,数据分析要求比较散,从服务的系统数据统计到业务趋势统计都有涉及,而且统计需求变化较多,因此开始筹备自己写简单的统计抽象模型来规避MapReduce类,提高适应变化的能力,同时出于简化系统设计维护的要求,直接将每日分析数据放置到集中服务器上,每日拉取,切割,分析(统计分析引擎抽象完成)。2010年开放平台基础体系开始建立,对于服务质量,应用行为,用户安全都提到了较高的要求上来,分析结果从原来的统计分析,扩展到了监控告警,每日分析转变为增量分析(频度1小时左右),简化的任务调度模型抽象出来,同时服务调用量增涨到了9亿。2011年平台数据统计分析及时性要求更高,同时开始开放统计数据给外部开发者(系统可用性和效率要求更高),整体框架和局部设计不断优化和改进,截至今年11月,单日最大服务调用量19亿,增量统计实时性要求在2分钟内(包含数据分析和数据产出,低峰期1分钟,高峰期1分半),系统可用性要求高于99.6%,而投入的服务器比起动则几十台甚至上百上千的Hadoop系统来说,就是一个迷你集群(一台Master实体机(16核,16g内存),12台虚拟机(虚后5核,8g内存,实际为4台实体机)),每天负责600g增量数据分析,产出1.5g数据。

很多时候很多开发人员会问到说在业务和代码结构优化冲突的时候怎么办,老板要结果,而程序员要的是看起来不恶心的代码,但很多时候,我们就是在摸索中做事。上面描述的背景就好比开始买的是件夏天穿的短袖,然后天气不断变冷,开始给短袖加袖子,然后在身上贴补丁,但最后真的要到冬天的时候,应该怎么办,在秋天和冬天之间,作为核心代码负责人,就应该保证系统可用性的情况下做好另一手准备(简单来说,时间不是别人给的,而是自己给的,天晴补漏,雨天不愁)。因此年末的两周将2年中断断续续走过的路,重新整理了一遍,取名为beatles(甲壳虫),因为甲壳虫虽小,但聚集起来能够吃掉一大片叶子(业务系统各种需要分析的数据),因此这个框架首先是个很小的内核(希望有更多扩展和参与者),其次不是一个从头开始的项目,而是一个两年多断断续续演进产品的积累。

Beatles不是一个万能的技术产品,它出生和发展就为它适合的场景做了定义,因此使用和扩展的时候需要明确的了解是否合适,避免勿用,下面两个图会大志说一下它的特点和适用场景。

上面这张图左边部分是Beatles可以局部牺牲的,右边部分是场景要求的。由于是对流式数据的增量分析,因此对于历史数据的全量挖掘无能为例(这部分完全可以用Hadoop这种离线分析系统来做)。数据精确性要求所有数据在分析的任何一个环节都要做好保护(数据输入,分析过程,数据输出),而这种强完整性要求势必会使得系统的效率和可用性降低(和右边实时性矛盾),因此会被放低一些要求(类似于计费结算等就直接一天走一次分析即可)。在左面所看重的三个部分大致分布的场景为:监控告警(业务,系统,用户,平台透明化),业务即时分析对比(ABtest),系统灰度发布对比,用户实时统计展示(非金额等数据一致性要求较高的内容展示)。

整体设计

要满足上面所说的场景,实时流式数据分析需要做哪些功能?

任务调度抽象

Beatles的任务调度十分简单,遵循两个原则:1.按需分配(Slave的多少及Slave自身执行任务的快慢自然促成的分配方式)2.任务粒度细化,粗暴简单的任务重置(通过透明化监控任务可能出现的问题,避免集群陷入一个任务的纠结中)。优势:简单,高效,易扩展(Slave随时来,随时走)。劣势:对于任务执行可控度较弱(通过任务细粒度和粗暴重置状态的方式来降低风险,增加的只是节点重复计算的浪费可能性)。

master处理流程

可以看到Master整体就两部分工作,对内部任务的管理维护,对外部slave请求的处理(请求获取任务,返回处理后的结果(Slave也可以不返回结果,根据Job定义来判断,防止Master变重))。Master单点并不可怕,只要遵循两个原则:现场可快速恢复,分析流程可追赶。因此做到Master所有状态定期外移和实时监控,即可满足这种简单的Master可用性需求。

slave处理流程

Slave更为单纯,整个生命周期就是获取任务,分析任务,返回任务结果的一个环,内置一个分析引擎和交互组件,根据任务的定义来无差别化的处理各种分析工作(Job定义了数据的输入来源,输出目标地址,分析规则)。Slave的设计主要考虑如何做到无业务规则侵入和数据来源限制,满足了这些需求的情况下才能够实现节点处理无差别性,各种分析任务可以跑在一个集群上(实现计算节点可复用)。

任务抽象

任务抽象设计比较简单,主要结合任务调度设计,实现计算节点的无差别化。

Job是一类分析的定义(例如对gc的日志分析,对服务调用日志分析可以定义为两个Job),Job中的Task表示对于这一类数据分析再次拆分任务,来分解海量数据处理,Task中继承了Job中的输入和输出,支持多种模式的数据来源和数据输出。Rule就是分析统计模型抽象部分主要分成:Alias(对于分析数据的列别名定义),Entry的MapReduce的定义,Report是Entry整合成用户可接受的Report的定义。

统计模型抽象

统计模型抽象主要分为两部分:统计模型抽象和统计流程抽象。统计模型抽象就是将MapReduce的Key-Value统计,转化成为传统意义上的报表结构。

分析的输入:(弱业务含义的大表)

c1,c2,c3,c4,c5,c6

c1,c2,c3,c4

c1,c2,c3,c4,c5,c6,c7

……

MapReduce可以处理的:

如下图,传统报表的一行可以看作是多个相同key但不同统计字段组合的结果。

例如:输入的数据结构如下:

服务名称,服务类型,服务上行数据流量,服务处理结果(错误码),服务耗时

真实日志如下:(分隔符可在分析时指定,这里用逗号作演示)

taobao.user.get,read,100,0,20

taobao.product.add,write,1000,0,50

……

那么定制如下MapReduce组合:

Key:服务名称,Value:服务上行数据流量总和。

Key:服务名称,Value:服务耗时总和。

Key:服务名称,Value:服务平均耗时。

Key:服务名称,Value:服务最大耗时。

Key:服务名称,Value:服务最小耗时。

那么将这些MapReduce处理后的Key-value在组合一次就可以得到:

Key:服务名称,Value:服务上行数据流量总和,服务耗时总和,服务平均耗时,服务最大耗时,服务最小耗时。

简单来说其实就是类似于SQL中的Groupby的方式,将一堆<key,value> groupby key。

分析流程抽象如下:

分析流程抽象

流程中可以扩展的在第三步和第四步,第三步影响了Key的生成(当简单的列组合成字符串无法满足生成key的情况下可扩展),第四步影响value的生成。(当map的value生成以及Reduce无法满足需求的情况下可扩展),要使用min,max…以外的reduce,可以直接在ReduceClass中作处理,然后使用plain输出实现。

这种流程比传统的MapReduce的写法好处在于可以对输入只读取一次(海量的日志文件为了多种条件分析,反复读取本身就是最大的损耗)。可以看到在文件IO操作上,不会随着分析模型配置的增多而增长,中间数据也不会随着报表组合的不同而过快膨胀(只要报表复用Entry足够多)。

整体组件和流程设计

角色定义

Beatles内部业务组件如上图。

Master包含两个子组件,JobManager用于管理任务,MasterConnector用于与Slave通信。

Slave包含三个组件,SlaveConnector用于与服务端通信,AnalysisEngine用于数据分析,JobResultMerger可选,用于在客户端分担服务端汇总结果的压力,同时让Slave可以多线程并行执行任务。(当然单机可以跑多个Slave的实例)。

Job&Task已经提到过用于任务抽象,支持Slave的Analysis Engine的分析无差别性。

Input&output用于扩展整个框架的各种数据来源,例如job构建的来源,job的输入和输出等。

整体流程

1. Master利用jobManager通过JobBuilder来构建服务端的任务集合。

2. Slave向Master发起要任务的请求。

3. 通过Master和Slave的Connector来做交互。

4. MasterConnector向MasterNode内部的事件处理模块提交事件。

5. JobManager检查内部任务状态后返回未完成且符合条件的任务返回给Slave。

6. SlaveNode收到任务后调用内部分析引擎并行执行任务分析。

7. 分析引擎获得任务的数据来源,开始分析数据。

8. 如果是多个任务并行执行,合并同一个Job的多个Task的结果。

9. 导出分析后的结果

10. 如果是需要汇总到Master的话,利用SlaveConnector返回给Master。

11. MasterConnector获得返回的分析结果数据。

12. MasterNode类似走事件流程,然后调度到合并组件合并结果。

13. 当Job任务全部完成就调用JobExporter导出数据。

代码结构体系:sourcecode:(https://github.com/cenwenchu/beatles)

整体包结构

整个项目内容不多,根据包名的前缀可以发现主要分成两块:node,Statistics。前者是任务调度及任务抽象,后者是统计分析模型抽象。

Config中是多个角色各自的config定义,同时这些config会在一个实体里传播,例如MasterConfig就在MasterNode中传播到jobManager和MasterConnector组件中,SlaveConfig就在SlaveNode传播到分析引擎组件和SlaveConnector中。

Node中的结构如下:

Component:对Node的各个组件接口的实现。

Connect:Master与Slave交互的接口定义和实现。

Io:对于Job的输入输出来源的接口定义和默认几个实现。

Event:定义了Master和Slave这样的Node中需要处理的事件。

Job:任务抽象定义。

Map,Reduce:支持当分析引擎无法满足的Map,Reduce的情况。(足够通用的情况下可以被抽象到主框架中)

Operation:定义了Node结构中需要异步处理事件。(因为当前Node的Event是单线程处理的,因此事件执行如果比较消耗,则需要异步后台执行,或者并行执行)

Util包是一些工具类和定义类。

Staitistics是分析引擎接口和实现,其中Data中是分析规则的抽象。

至此为止,整体的结构设计就如上所述了,整体上结构比较简单直接,可扩展性为了支持分析规则扩展,不同计算场景扩展,效率和可靠性扩展。下一个部分将会细化到具体的模块代码设计上来谈优化和代码技巧。

Beatles小记-分布式数据流分析框架(二)

局部设计

首先要说明的是,这部分内容和第一篇不同,必须对照代码看才会理解其中的含义,光看设计实现会比较难懂其中所说的细节点。代码:https://github.com/cenwenchu/beatles

IComponent:

做系统就像搭积木一样,这些组件最后都会拼装起来,而积木往往由于内部机制可定制化需要config,同时组合在一起的积木往往会有一个传递的Config(可以认为是静态的Context)。

INode:

Node的设计比较简单,自身是Runnable的单线程循环体,内置一个单线程事件监听分发器。Node的主线程主要负责该Node自身产生的事件处理(常规已知事件处理):Master就是维护任务列表状态,并根据任务执行情况做一些Action,Slave就是重复的获取,执行任务,返回结果。而Node中的事件监听器主要用于外部消息驱动事件处理(偶发性事件处理),例如Master收到Slave的请求,外部要求导出载入中间结果等。这里会发现采用的是单线程阻塞检查获取事件:

1.多线程并发检查事件,对于事件承载者(队列)就要求做好并发控制,也就是最终在获取事件的过程中依然是串行化,所以大部分事件处理框架对于事件检查都采用单线程,简单高效。

2.单线程可以用于检查事件,但执行事件会采用多线程或者主线程直接处理,取决于事件执行速度和可靠性(外部依赖),如果事件可以快速执行(无外部依赖,逻辑简单),则采用检查线程直接处理(NIO中对于连接建立和销毁直接在主线程中处理掉,这里Master对于获取任务事件的前半段采用直接处理),如果事件消耗时间较久,或者依赖与外部系统稳定性和处理情况,则需要采用多线程异步处理(这里很多都是让内部组件来保证方法执行的快速返回,例如JobExporter的所有方法等处理都是内部线程异步完成,对外接口快速返回),如果必须要有结果回执,那么可以采用回调模式或者直接提交新事件(带有上下文可以接着上次处理)到事件处理引擎。

3.另一种方式会选择将事件分开放置来提高处理效率,例如TimeOut事件和普通外部激发事件,注意尽量避免通过轮询对象状态来判断事件发生,除了Timeout必须这么做,其他尽量通过对对象状态变更操作内置事件产生器来创建真实事件,这样事件处理者只需要阻塞等待事件即可,系统内状态规模增大对于事件处理时间复杂度还是O(1)。对于Timeout这类事件后续介绍SlaveConnector中有更详细介绍,至此略过。那是否需要将不同业务处理放置在不同的队列交由不同的单线程处理,取决于系统事件产生速度,就好比NIO中可以起多个Selector来处理,由于起多个单线程守护到队列如果利用率不高,其实对于系统来说也是一种负担,所以可以做成可配置的方式提供给外部(beatles中是没有提供配置,就一个线程,因为本身并不是大并发的web前端系统,接上千个slave的话消息量分布也不会非常密集,毕竟任务分析本身需要消耗时间)。

MasterNode中有两个组件:JobManager和MasterConnector,一个负责上层业务处理,一个负责消息通信,在MasterNode运作的时候,两者其实需要协调工作,例如MasterConnector可能会收到消息,需要提交给JobManager处理并获得结果返回。为了实现内部组件不会相互依赖(MasterNode内部成为网状结构),采用MasterNode作为中间消息传递者,通过事件或回调方式相互驱动,同时利用上下文(将Channel作为Event的一部结构,用于后续消息返回)来传递一些环境信息。需要注意的是,这种解耦的做法势必带来性能的下降,因此可以和前面提到的事件处理为多线程还是单线程一样,对于消息机制的依赖也不要盲从,按需使用,例如Connector通过事件提交给MasterNode,MasterNode接收事件后调用JobManager处理,处理后的结果也可以利用事件机制反向驱动Master去调用Connector,但也可以直接将MasterNode植入JobManager,反向利用代理模式来直接处理,这里关键看你是否需要释放掉你当前的线程,让任务异步去做,而当前线程可以回收去做更多的处理,带来的是线程切换和事件驱动的消耗。不过总体上来说让组件的宿主来完成交互,能够减少模块间依赖带来的耦合性和复杂度。

FileJobExporter

这个类主要用于文件输出,但在输出部分的代码中有lazymerge的部分,所谓的lazy merge指的是部分entry<key,value>的结果是依赖于处理后的部分结果而得到,例如成功率这个指标就是用成功数/总数。作为分析系统来说,如果成功数的<key,value>需要长期保存,总数的<key,value>需要长期保存,那是否需要在最终产出报表以前就将成功率的<key,value>计算并保存在内存中呢?其实大可不必,不仅浪费了cpu资源,也浪费了大量的内存资源,同时slave传递给master还会使得网络io消耗增大。在beatles中认为export就是最后的一步,因此在这个时候做计算和导出。在我们很多系统中,考虑一下很多中间结果是否需要输出,还是保留在最后一步输出(并不是保留在最后一步一定好,取决于代价,如果最后一步有大量的计算要做,那么可以用内存换机算,提早计算来减缓最后导出时的压力,如果导出时计算不大,而系统整体处理内存资源紧张,那么就滞后处理)。衍生开来很多时候,需要考虑重复计算带来的成本和节省内存带来的收获谁更有利,如果计算节点分散且规模巨大,则可以靠虑利用外部计算能力来减少集中式处理的代价(好比很多前端处理的结果可以滞后到客户端处理而不是服务端集中处理,开放平台的数据序列化推后到业务方集群处理而不是开放平台统一处理)

JobManager

由于MasterNode中是单线程调用,因此对于任务状态变更变得非常简单(无需并发控制和原子操作),但由于MasterNode将来还是可扩展为多个线程处理,因此暂时保留原子操作的处理模式。

1. 对于对象状态管理,如果对象层次比较多,尽量扁平化处理,就好比把TaskStatus直接保存,有利于检查和原子操作,带来的问题就是另一部分对象的状态同步变更(Task中的状态),其实简单来说就是两个数据结构修改要做到事务性,做法比较简单,细粒度的原子操作模拟锁争夺,例如要修改Task的Status首先要并发的修改TaskStatus的数据(if (statusPool.replace(taskId, JobTaskStatus.DOING, JobTaskStatus.DONE)),如果修改成功,才可以修改原始对象内的数据。其实如果是单线程都不需要并发控制(因为并发的模式还是有些消耗的)。

2. 事件驱动模型中很重要一点就是事件状态必须在所有必要操作后再改变(即创建事件),例如:早一个版本中,Master收到Slave返回结果时,将会把结果设置到Master的某一个Task的result属性中,同时改变Task的状态为done,这两个动作就必须保持一定的顺序,也就是先要把内容设置进去,然后再改变状态,因为如果先改变状态,外部事件处理线程如果发现状态已经改变,又没有锁保证结果放进去以前不能处理这个事件,就会发现事件开始被处理了,但是内容还是错过了处理,出现线程并发问题。这点在这个版本的源码注释上面有点问题,后续修改掉它。

3. 在主流程上有一个方法mergeAndExportJobs,用于检查Job内部的Tasks完成状态,决定是否合并或者导出结果,首先受限制于JobManager主流程是单线程处理,同时内部Tasks状态随时会变,因此要求主流程的所有操作和检查都必须非阻塞,保证处理的即时性,但如果这个方法里面的所有操作都变成另起线程异步处理的话,就同样会发生上面我谈过的事件检查多线程模式最终还是会并发控制下变成串行化,效率不升反降,因此采用同一业务性数据处理守护进程唯一性的方式(其实简单来说就是在这里Master中管理多个Job,多个Job其实就好比多个事件队列,因此必须并行处理,否则会有互相影响的风险,但是单个Job的处理可以只有一个守护线程处理,因此对Job加事件锁,保证不同Job之间同一个事件并行,同一个job不同事件并行(这里由于都是顺序化的,虽然并行了,但还是要等待上一个事件完成后才会修改内部状态继续往下走))

4. 在第一篇里面说到,这个框架对于任务执行异常的处理十分简单,事先规定好单个任务执行的最长可接受时间,如果到了时间尚未获得反馈,就认为出现问题,任务重置可以接受下一个计算节点的处理请求。(结果谁先返回就用谁的)这里其实要注意两点:任务时间可评估是基于任务切分粒度够细,其实很多时候可以考虑通过任务细化来降低任务出现问题解决的复杂度,同时也可以降低计算节点重新做任务的代价。另一方面需要设置重置次数透明化,保证如果任务本身有问题(例如数据来源出现问题),不会使得所有的计算节点陷入单个任务处理死循环。

5. 合并数据的代码优化:

A. Master合并时每一个Job只有一个主干,也就是最后job的所有Task Result都必须合并到这个主干,假设这是个svn主干,可以想象多个人(多线程)是无法并行合并的。那么当主线程在A时刻发现有4个结果需要合并的时候,它开始把4个结果合并到主干,合并的过程中可能又来了3个结果,那么这三个结果就必须等待下一轮的合并开始,此时这三个结果耗费的内存就会增加系统的负担,同时系统如果Slave越多,这样的情况越严重。因此引入下面一种模式,多线程合并,但主干和虚拟分支同时进行,当需要合并时首先竞争主干锁,得到主干锁的线程将这次需要合并的结果和以前合并的虚拟分支一起合并到主干,而如果没有得到主干锁的线程并行的合并结果到虚拟分支上。此时充分利用多核的计算能力来压缩对于内存的需求(结果合并后会大大减少存储的需求)。

B. 由于A中的描述可以看到,主干在整个Job的任务执行合并过程中都被保存在内存中,因此当结果集越大,主干对系统内存消耗就越大,而Job的多轮合并是否可以最后载入上一轮的主干和本轮增量结果合并,这样可以大大减少内存消耗,但是内容的导出和载入带来的序列化代价和IO的消耗势必会增加每一轮的处理时间,和减少GC带来的节省时间的优势可能会冲抵甚至有负面效果。因此通过异步载入和导出,即节省了内存占用,减少FullGC带来的停顿,又不影响处理,另一方面其实也是利用两个阶段的CPU闲置率较高来交换内存的代价。(这部分代码参看jobexporter和jobmanager)

SlaveNode:

充分利用Slave单机CPU的方式可以是:一台机器可以跑多个Slave。也可以跑一个Slave,单个Slave一次要求获取多个Task,这样可以并行利用多个cpu处理多个任务。

为了减少Master的合并压力,可以让Slave直接输出,也可以通过Slave要求多个Task,执行完多个Task在本地合并(Task必须是同一个Job才可以合并),再将合并后的结果会送给Master。

对于同一个数据源可以通过创建同样的多个Task来增加对其的处理速度,例如A机器的日志增长比B机器的快,那么可以配置,两个数据来源是A机器的Task,配置一个B机器的Task,来差别对待处理速度。

对于处理后的数据如果还需要二次处理,可以构建Job的数据来源是一次处理后的数据输出地,当一次数据输出以后,自然二次处理才会开始。

简单来说,很多复杂的sharding设计,reduce的考虑,任务迭代处理,其实都可以通过扁平化的方式来解决,有时候花很很大力气去做的看似很fancy的设计,不如归一化处理。(再大的数字都是从一衍生出来的)

Connector:

这部分设计主要是屏蔽掉分布式概念的误区,很多分布式设计开始的时候不是注重对于主节点和次节点的业务交互上,而是纠结于底层设计上,最后就会落得调试难,扩展难的情形。和上面的归一化设计思想一样,所谓的分布式其实可以是一个进程内(虚拟机内)的交互协作,一台机器多进程的交互协作,多台机器多进程的交互协作,因此如何能够适合这三个场景,就会让设计变得简单,容易扩展,实现与接口分离。

Event:

Event中需要考虑一些上下文设计,例如序列号保证松散交互的会话可维护性,Channel等后续操作的基础传递。Event尽量做到无业务侵入,例如虽然需要Channel,但不同的实现Channel是不同的,MemChannel和SocketChannel就不同,将来扩展更是不同,做好一些就抽象一些接口(但可能需要对一些实现做外壳封装适应接口),或者就直接Object弱化类型。

InputAdaptor&OutputAdaptor:

任务的自描述性除了业务规则的自描述性,更需要输入输出的自描述性,所有计算归结到底无非是输入,处理,输出,如果三者定义清楚,并且可以通过支持协议扩展适配,那么对于计算节点来说就非常通用了,不必因为业务的差别,数据来源和输出的差别来分别建立多个集群,最终还是发现多个集群无法很好的充分利用资源的高低峰(对于明确要保护的计算集群可以直接构建,对于一些非关键性的计算任务可以丢到一个集群中搞定),降低成本。

Job:

本身是一组任务的集合,自身有多个状态位,当前通过多个状态来表示(可以合并为一个原子状态位),内置一些锁来控制主干的并发访问,守护进程的分配。(这点在另一个PipeComet项目中对于长连接管道下行守护进程按需分配也有充分利用到)。

Operation:

这个包里面是将耗时的操作封装为可以被外部线程独立执行的Runnable,可以看见在整体代码里面有用外部线程异步执行的,也有直接在线程里面阻塞执行的,取决于对于结果返回的同步性需求,如果同步性需求明确,那么可以用异步+锁的方式来模拟同步,也可以直接同步,但前者代价较大,所以将这类操作抽象,上下文通过参数传递来构建出可以异步也可以同步执行的逻辑块,提高了功能执行的灵活性。

CreateReportOperation中的输出模式还是比较节省空间的,可以看一下如何基于<key,value>列矩阵输出报表这样的行式记录保持对内存较小的占用。

ReportUtil:

是个工具大杂烩。

1. mergeEntryResult。将多个矩阵结果合并的函数,里面有不少的节省内存的做法,首先选取第一个矩阵作为base,节省申请和合并的过程,合并过程中不断删除合并后的数据,节省后续合并成本,释放资源。

2. compressString。尝试采用不可逆压缩来减少处理中中间key占用的内存,例如每一个entry的key是几个列的组合,而key仅仅表示唯一性,如果能够做到压缩且不失唯一性,那么最终不会影响需要输出的结果。这里采用短链接的处理方式。(md5+16以上的进制模式)

TimeOutQueue:

最前面提到过,基本所有的外部对象状态变更都可以被捕获,然后产生一个事件,而超时事件必须是主动检查才可以判断,因此当对象数据量增加的时候,超时检查的消耗就会变成O(N),一般会推荐使用分区模式(时间轮盘,时间槽)来缩减N增加带来的影响,另一种方式比较适合于超时时间不会变动的情况,就好比将一个对象放入后,它的超时时间从创建初期到销毁都不会再改变,如果是这种情况,那么可以采用这个类的实现方式。

内置一个有顺序的单向链或者队列,按照超时时间的前后建立先后顺序,最早超时的对象放在最前面,内部线程每次从队列或者链的第一位开始检查,如果发现超时,则处理,继续往后走,当发现没有超时的时候,获得该对象距离超时时间的间隔,然后挂起这间隔的时间。期间有任何数据加入,如果超时时间小于队列第一个对象超时时间,则加入队列,然后唤醒检查线程(切记顺序不要反,先加入队列,再唤醒)。最后在增加一个防止队列为空的消费者生产者的标识,保证不要空循环。

Beatles小记(三)-分布式数据流分析中Master的横向扩展

读前先看:

这篇文章主要讲述的是beatles流式数据分析框架中对于master的横向扩展真实的设计演进,是beatles框架介绍的第三篇,看第三部分的时候如果看过前两篇文章,这篇文章的递进应该比较容易理解(如果看过前面的代码,那么会更深入的理解其中的细节,文字图片描述一分钟,代码写写1个周)。如果没有看过前两篇,那前提你要理解hadoop等常用的分布式分析系统再看,否则最后可能交流起来就有些空对空了,因为真的写了和用了就会有体会细节的差别。

废话不多说,看完后如果不看代码其实也很难体会细节,因此建议看完后看看代码,跑一下测试用例子(MasterSlaveIntegrationTest_SocketVersion是socket集成测试版本)。

下面文章中的mr表示MapReduce的意思。

Master的横向扩展:

尽管Beatles这种松散模式下Slave可以随时随地的加入集群,但由于最终数据还要汇总到Master,Master本身承担着类似于Hadoop中Reduce的角色,所以Master在版本迭代的过程中不断的对本身做着各种优化:并行模式下多线程的数据合并,主干数据分析周期的磁盘换入换出,支持Slave多任务合并后返回,数据导出广度遍历column存储结构等等(详细的参看第二篇文章)。当数据base真的非常大的时候(例如:对于用户纬度的统计,统计后结果还是非常多,无法hold在内存),开放平台分析系统给数据需求方提供的解决方案是片段性输出,交由外部再次合并这些片段。(这是基于当时间片段足够小的时候,数据片内容可以被承受)但结果是让外部数据使用者利用数据库去对这些结果做二次归并。这不仅给数据结果使用者带来了问题,也使得Master随时会被最后一根稻草压倒(如果时间片数据依然无法被hold)。

下面看看老的结构图:

图1 Master横向扩展前结构图

1. master从不同的jobs来源构建需要处理的分析任务,并拆分为多个task是。

2. slave请求任务(一个或者多个)去做分析处理。

3. slave获得任务后从任务描述中获得数据来源,分析规则,输出,开始从数据来源增量获取数据进行分析。

4. slave根据任务定义判断多个任务结果是否可以合并,并将合并后的结果发回给master。至此slave的一轮业务生命周期结束,再从步骤3开始重复。

5. master收到各个slave的数据,开始多线程并行合并job结果,最终判断某一个job的task是否都已经完成,如果完成开始导出数据和临时文件,重置job开始下一轮的job执行。

思考:

是否增加一个角色:reducer来替代掉master的工作?

其实slave一次能够获取多个jobtask,然后自我合并,就是一种比较弱的reduce的做法。

否定了新增一个reducer的原因:新增角色增加了管理的复杂度和集群扩展性。(Hadoop就直接用slave作为reduce)

如果用slave来完全承担所有的reduce工作?

1. 破坏了原来master不管理slave集群的基本原则,slave动态扩展非常麻烦,同时需要增加心跳管理和各种调度算法,任务管理来完成结果的合并。(最后就是一个hadoop的设计)

2. 不考虑用文件作为数据交互的方式(因为流式分析的特点:片段性数据量不大,及时性要求高,所以最好直接是内存),因此hadoop最亮眼的hdfs没有用到,hadoop设计将会大打折扣。

如果用master继续做reduce,是否可以考虑横向扩展master?

Master的职责:

1. 从任务源(可能是本地配置文件或者db或者是http服务)获得jobs定义构建任务池。

2. 被动分配多个job的tasks。

3. 管理job状态以及jobtask的状态。

4. 根据jobtask状态合并slave返回结果到job各个主干上。

5. 根据job状态导出job每一轮的中间结果和统计结果。

6. 根据job状态判断是否重置job。

会发现其实master归根到底就是对job的管理以及对job中数据结果的合并和导出,而最为消耗的就是类似与reducer的4,5两个步骤。

先介绍一下job和jobtask的概念,利于对后面的拆分有更好的理解:

job包含了一组jobtask,job本身定义了一组mr规则(类似有很多mr处理实现),定义了要处理的数据来源(其他信息参看代码)。

Jobtask是将job的多个数据来源拆分后得到的一个子任务,也就是每一个数据来源和同一组mr成为了一个任务,在slave获得一个或者多个task的时候,可以自己通过数据来源去获取数据,然后根据一组规则直接对流式数据做大量的mr(这也是和最早hadoop自己写mr的差别,其实数据的多次移动和读入才是计算集群的最大成本),但最终所有的jobtask都要合并到job的结果主干上,最后导出临时结果和报表数据。

a. 如果等价部署多个master,所有slave连接一组master,来做master横向扩展会如何?

a) 任务管理就需要多节点之间做并发控制,当前可以看看master内的代码是一个进程内做并发控制。这种方式过于复杂,带来的消耗远大于收益

b. 如果等价部署多个master,所有slave连接一组master,但是将job或者jobtask按照业务(上面说过job就是定义了多个mr的实现,要拆分也只能将mr分组放到不同的master上来减轻mr产生的<k,v>对存储带来的压力)分摊到多个master上,即不用对任务管理做并发控制,又可以分担reduce的工作。

a) slave主动请求任务,如何判断应该优先向谁请求任务?任务负载均衡如何做?最后可能还是单独部署多套集群来的靠谱。

b) 将不同的mr放在多个master可行,但结果就和hadoop独立的写mr带来的结果一样,对同一份数据来源处理,却因为mr的分组数据被反复读入和移动。

c. 和Zookeeper类似,master建立group,但是只有一台负责1,2,3,6的工作,而4,5则可以扩展到多个master上。需要对原来系统的改造为:

a) 多个master job池构建来源保持一致,构建完毕增加mr与master的对应关系(根据算法实现平均分配,后面介绍关于分配的算法,注意只有主的那台master接受任务分发请求,负责将mr与master建立对应关系在task中传递给slave)

b) slave从一台master上获取任务,分析后将结果按照mr分组(执行的Task中带有,所有设计都是这样,slave不保留任何带有业务性或者集群管理性的配置,保证slave随时离场,随时加入),将分组后的mr结果并行的发送到多个master上。

c) master在合并和输出结果的时候判断自己所负责的mr部分,按需合并和存储(虽然在slave中已经有做业务数据分组)。

会发现多个master好像多台流水线一样,保持着相同的动作和周期性,从同一个slave获取到了不同原始材料以后,开始制作零件,然后以匹配的速度来将不同零件丢到一个筐子里交给后续处理者。

当然你会考虑到还有容错,master集群动态扩展,速度不匹配等问题,后续细节慢慢介绍。

基于上面所描述的情况,结构演变成如下:

图2 横向扩展后的结构图

可以看到Slave现在获取任务还是集中在一台,但是返回任务结果会支持分散到多台master,解决master瓶颈最大的问题。同时master组的jobs来源保持一致,作为横向扩展的基础(主master分组mr简化master的协同,其他master没有获得数据就没有结果输出)

细节:

1. master group之间不需要通信。(主要是业务分拆信息,可以通过幂等算法,也可以通过单机分配,分析结果过滤投递的方式),当前主要是用分析结果过滤投递来保证。

2. 平均分配算法。

首先master有权重(有实体机器,也有虚拟机,处理能力不同),其次mr的权重也不同(有对app做简单统计的,有对用户做统计的,数据量相差非常大)。当前考虑一个mr一台虚拟机抗不论多少数据都能抗的住,如果扛不住后续可以再根据mr产生的结果分(这会增加分流数据的消耗),其实可以看作现在是分库,以后就是分表。分配算法其实就是在两边都有权重的情况下做任务分配,且任务不可切割。

采用了两个排序队列,然后按照简单的饥饿+权重方式来分配,处理者根据当前获得的饥饿感排序,越饥饿的放在越前面(饥饿感=已分配任务/自身权重),当已分配任务为0的时候饥饿感=1/自身权重(保证能力最强的优先获得最大的任务)。任务按照权重排序,高权重的排在最前面。分配过程如下:

1. 获的当前任务队列最前面的任务(权重最高的任务)

2. 将任务分配给处理者队列中饥饿感最强的节点。

3. 节点吃了任务以后会再次按照处理者队列排序。

4. 循环到1再次分配任务,直到任务分配结束。

能看到的就是其实就是有一个评判饿的标准,按照资源权重高低来分配,最后平均分配资源。(也许各位会有更好的建议和算法)

这个算法在保证两个队列构建时始终一致性的情况下,能够变成等幂分配算法,其实也就是当两个队列中如果遇到评判标准相等的时候排序是否会有前后变化,如果没有,那么任何一个master都会产出相同的分配结果。(具体可以参看代码,在ReportUtil中)

3. Master的容灾。由于master其实不是等价集群的模式,因此master的不可靠会导致数据丢失。

a) Slave如果发送错误,则会尝试再发送一次,如果两次错误,则将master和对应的job作为文件名保存这次隶属于这个job的tasks结果到文件中(append 进文件)。

b) Master如果恢复的话,可以通过脚本将这些文件复制到master的目录下,master后台线程载入数据合并到主干。

c) 过程中如果master恢复,后续的消息将会投递到master,因此不会再写这个文件,因此文件不会需要有一个增量拷贝的过程,同时master也可以在后台线程慢慢恢复合并,最后使得数据保持一致性。

d) 当前还是处于半自动模式,后续可以考虑让slave后台线程将数据发送到恢复了的master上。

e) 中间可能损失部分时间片数据。

4. mr的动态增加或者减少(随时随地可以针对流式数据产出新的统计分析结果)。原来的集群就支持mr的动态增加,因为都是配置文件修改,重新载入翻译一下即可(统计模型被抽象后mr就可以穷举了,具体参看前面的文档)。当前因为master是多个,同时开始的时候就做好了mr与master的对应关系,因此增加或者减少如果从新做mr与master的分配会产生数据迁移的需求,因此对于mr的增加只是将变化部分再对master group做一次分配(假设原来那些mr分配是均匀的,现在归零master再分配,大志结果也是均匀的),对于mr的减少,只是消除掉task中的定义,mr与master对应关系不消除,避免后面要恢复带来的数据迁移。

5. master的动态增加和减少。这个不可避免的要做数据迁移,当前做法是每天是所有job重置的周期,增加和减少master将在那个时候对整个集群做停机,然后启动集群做重新编译,从今天起点开始分析,追赶数据。以后可以考虑如何做到不停机扩容master。(主要就是数据迁移)

6. 后续考虑做master与mr的冗余分配,既可以保证数据可靠性(多份数据分析存储),又可以方便扩容(数据迁移成本可以间接降低)

一些感受

最后master的简单横向扩展模式,使得数据分片,一定程度上得到了数据安全性的保证,同时对于非常消耗cpu和memory的reduce被简单的拆分开来,为业务发展提供了突破,最重要的是系统依然保持最初的设计原则和目的。

任何系统都有自己的适用场景,不要因为要做一个大而全的系统,而丧失了自己设计的原则,我们很难做一个hadoop,但是我们要的也并不是一个通用的hadoop,找到业务场景的特点,才能够用最简单高效的设计方式来完成业务不断增长带来的技术挑战。

另,在设计过程中时不时有同学问我要不要引入zookeeper,过程中曾考虑过,但最后还是觉得要解决了根本问题以后再引入,因为它只是一个工具,便于管理的工具。就像我们要求代码能够做单元测试一样,如果没有zookeeper是否你的系统就无法run,小到模块,大到系统,接口化设计就是要屏蔽实现对系统设计后期可扩展,稳定性的影响,所有系统最后都能够退化为文件交换方式的处理模式,因此如果能够用文件交换可以实现的了的话,你的系统就是最ok的了。(你可以发现linux这么伟大的操作系统就是如此,一切文件化,一切接口化,简单就是美,这个美需要不断的坚持自己的原则)


相关文章

企业架构、TOGAF与ArchiMate概览
架构师之路-如何做好业务建模?
大型网站电商网站架构案例和技术架构的示例
完整的Archimate视点指南(包括示例)
相关文档

数据中台技术架构方法论与实践
适用ArchiMate、EA 和 iSpace进行企业架构建模
Zachman企业架构框架简介
企业架构让SOA落地
相关课程

云平台与微服务架构设计
中台战略、中台建设与数字商业
亿级用户高并发、高可用系统架构
高可用分布式架构设计与实践

 
分享到
 
 
     


专家视角看IT与架构
软件架构设计
面向服务体系架构和业务组件
人人网移动开发架构
架构腐化之谜
谈平台即服务PaaS


面向应用的架构设计实践
单元测试+重构+设计模式
软件架构师—高级实践
软件架构设计方法、案例与实践
嵌入式软件架构设计—高级实践
SOA体系结构实践


锐安科技 软件架构设计方法
成都 嵌入式软件架构设计
上海汽车 嵌入式软件架构设计
北京 软件架构设计
上海 软件架构设计案例与实践
北京 架构设计方法案例与实践
深圳 架构设计方法案例与实践
嵌入式软件架构设计—高级实践
更多...