一 Storm简介
Storm是Twitter开源的一个类似于Hadoop的实时数据处理框架,它原来是由BackType开发,后BackType被Twitter收购,将Storm作为Twitter的实时数据分析系统。
实时数据处理的应用场景很广泛,例如商品推荐,广告投放,它能根据当前情景上下文(用户偏好,地理位置,已发生的查询和点击等)来估计用户点击的可能性并实时做出调整。
twitter列举了storm的三大作用领域:
1.信息流处理(Stream Processing)
Storm可以用来实时处理新数据和更新数据库,兼具容错性和可扩展性,它可以用来处理源源不断的消息,并将处理之后的结果保存到持久化介质中。
2.连续计算(Continuous Computation)
Storm可以进行连续查询并把结果即时反馈给客户,比如将Twitter上的热门话题发送到客户端。
3.分布式远程过程调用(Distributed RPC)
除此之外,Storm也被广泛用于以下方面:
精确的广告推送 在用户浏览产品的时候,将浏览记录实时性的搜集,发送到Bolt,由Bolt来根据用户的账户信息(如果有的话)完成产品的分类统计,产品的相关性查询等逻辑计算之后,将计算结果推送给用户;
实时日志的处理 Storm可以和一个分布式存储结合起来,实时性的从多个数据源发送数据到处理逻辑Bolts,Bolts完成一些逻辑处理之后,交给分布式存储框架进行存储,此时,Spout可以是多个;
Storm可以用来并行处理密集查询,Storm的拓扑结构是一个等待调用信息的分布函数,当它收到一条调用信息后,会对查询进行计算,并返回查询结果
二 Storm 集群的基本组件
Storm是一个分布式、高容错的实时计算系统,Storm对于实时计算的意义相当于Hadoop对于批处理的意义。Hadoop提供了Map和Reduce原语,使对数据进行批处理变的非常简单和优美。同样,Storm也对数据的实时计算提供了简单Spout和Bolt原语。
Storm 集群表面上看和hadoop集群非常像,但是在Hadoop上面运行的是MapReduce的Job,
而在Storm上面运行的是Topology(拓扑),它们是非常不一样的 —关键的区别是: 一个MapReduce
Job最终会结束, 而一个Topology永远运行(除非显式的杀掉它)。
Storm集群里面有两种节点: 控制节点(master node)和工作节点(worker node)
控制节点上面运行一个后台程序: Nimbus, 它的作用类似Hadoop里面的JobTracker。Nimbus负责在集群里面分布代码,分配工作给机器,
并且监控状态。
每一个工作节点上面运行一个叫做Supervisor的节点(类似 TaskTracker)。Supervisor会监听分配给它那台机器的工作,根据需要启动/关闭worker工作进程。
每一个工作进程执行一个Topology(类似 Job)的一个子集;一个运行的Topology由运行在很多机器上的多个工作进程
Worker(类似 Child)组成。
Storm topology 结构
Storm VS MapReduce
Nimbus和Supervisor之间的所有协调工作都是通过一个Zookeeper集群来完成,并且Nimbus进程和supervisor都是快速失败(fail-fast)和无状态的,所有的状态要么在Zookeeper里面,
要么在本地磁盘上。这也就意味着可以用kill -9来杀死Nimbus和Supervisor进程, 然后再重启它们,它们可以继续工作,
就好像什么都没有发生过似的,这个设计使得storm不可思议的稳定。
Topologies -- 作业拓扑
为了在Storm上面做实时计算, 要去建立一些topologies。一个topology就是一个计算节点所组成的图。Topology里面的每个处理节点都包含处理逻辑,
而节点之间的连接则表示数据流动的方向。
运行一个topology是很简单的。首先,把你所有的代码以及所依赖的jar打进一个jar包。然后运行类似下面的这个命令:
strom jar all-your-code.jar backtype.storm.MyTopology arg1 arg2 |
这个命令会运行主类: backtype.strom.MyTopology, 参数是arg1, arg2。这个类的main函数定义这个topology并且把它提交给Nimbus。storm
jar负责连接到nimbus并且上传jar文件。
因为topology的定义其实就是一个Thrift结构并且Nimbus就是一个Thrift服务,可以用任何语言创建并且提交topology。上面的方法是用JVM-based语言提交的最简单的方法,
看一下文章: 在生产集群上运行topology去看看怎么启动以及停止topologies。
spout和bolt所组成一个网络会被打包成topology, topology是storm里面最高一级的抽象(类似
Job)。
注:Thrift 是一个软件框架,用来进行可扩展且跨语言的服务的开发。它结合了功能强大的软件堆栈和代码生成引擎,以构建在
C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell,
C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml
这些编程语言间无缝结合的、高效的服务。
Stream -- 数据流
Stream是storm里面的关键抽象,一个stream是一个没有边界的tuple序列。storm提供一些原语来分布式地、可靠地把一个stream传输进一个新的stream。比如:
你可以把一个tweets流传输到热门话题的流。
Storm提供的最基本的处理stream的原语是spout和bolt。可以实现Spout和Bolt对应的接口以处理你应用的逻辑。
spout -- 流的源头
比如一个spout可能从Kestrel队列里面读取消息并且把这些消息发射成一个流。又比如一个spout可以调用twitter的一个api并且把返回的tweets发射成一个流。
通常Spout会从外部数据源(队列、数据库等)读取数据,然后封装成Tuple形式,之后发送到Stream中。Spout是一个主动的角色,在接口内部有个nextTuple函数,Storm框架会不停的调用该函数。
注: Kestrel是一个scala写的twitter开源的消息中间件,特点是高性能、小巧(2K行代码)、持久存储(记录日志到journal)并且可靠(支持可靠获取)。Kestrel的前身是Ruby写的Starling项目,后来twitter的开发人员尝试用scala重新实现。
Bolt -- 处理逻辑
负责处理输入的Stream,并产生新的输出Stream。Bolt可以执行过滤、函数操作、Join、操作数据库等任何操作。Bolt是一个被动的角色,其接口中有一个execute(Tuple
input)方法,在接收到消息之后会调用此函数,用户可以在此方法中执行自己的处理逻辑。bolt可以接收任意多个输入stream,
作一些处理, 有些bolt可能还会发射一些新的stream。一些复杂的流转换, 比如从一些tweet里面计算出热门话题,
需要多个步骤, 从而也就需要多个bolt。 Bolt可以做任何事情: 运行函数、过滤tuple、 聚合、
合并以及访问数据库等等。
Spout 与 Bolt
注:1.topology里面的每一个节点都是并行运行的。 在topology里面,
可以指定每个节点的并行度, storm则会在集群里面分配多个线程来同时计算。
2.一个topology会一直运行直到显式停止它。storm自动重新分配一些运行失败的任务,
并且storm保证你不会有数据丢失, 即使在一些机器意外停机并且消息被丢掉的情况下。
运行中的Topology主要由以下三个组件组成的:Worker processes、
Executors threads以及Tasks
它们的数量关系如下图所示:
Spout或者Bolt的Task个数一旦指定之后就不能改变了,而Executor的数量可以根据情况来进行动态的调整。默认情况下#
executor = #tasks即一个Executor中运行着一个Task。
三 数据模型
storm使用tuple来作为它的数据模型。每个tuple是一堆值,每个值有一个名字,并且每个值可以是任何类型,
一个tuple可以看作一个没有方法的java对象。总体来看,storm支持所有的基本类型、字符串以及字节数组作为tuple的值类型。也可以使用自己定义的类型来作为值类型,
只要实现对应的序列化器(serializer)。
一个Tuple代表数据流中的一个基本的处理单元,例如一条cookie日志,它可以包含多个Field,每个Field表示一个属性。
一个没有边界的、源源不断的、连续的Tuple序列就组成了Stream。Tuple本来应该是一个Key-Value的Map,由于各个组件间传递的tuple的字段名称已经事先定义好了,所以Tuple只需要按序填入各个Value,所以就是一个Value
List。
四 流分组策略(Stream grouping)
流分组策略告诉topology如何在两个组件之间发送tuple。 要记住,
spouts和bolts以很多task的形式在topology里面同步执行。如果从task的粒度来看一个运行的topology,
它应该是这样的:
从task角度来看topology
当Bolt A的一个task要发送一个tuple给Bolt B, 它应该发送给Bolt
B的哪个task呢?
stream grouping 专门回答这种问题的。在我们深入研究不同的stream
grouping之前, 让我们看一下storm-starter里面的另外一个topology。WordCountTopology读取一些句子,
输出句子里面每个单词出现的次数.
TopologyBuilder builder =newTopologyBuilder();
builder.setSpout(1,new RandomSentenceSpout(),5);
builder.setBolt(2,new SplitSentence(),8)
.shuffleGrouping(1);
builder.setBolt(3,new WordCount(),12)
.fieldsGrouping(2,newFields("word"));
|
SplitSentence对于句子里面的每个单词发射一个新的tuple,
WordCount在内存里面维护一个单词->次数的mapping, WordCount每收到一个单词,
它就更新内存里面的统计状态。
最简单的grouping是shuffle grouping, 它随机发给任何一个task。上面例子里面RandomSentenceSpout和SplitSentence之间用的就是shuffle
grouping, shuffle grouping对各个task的tuple分配的比较均匀。
一种更有趣的grouping是fields grouping, SplitSentence和WordCount之间使用的就是fields
grouping, 这种grouping机制保证相同field值的tuple会去同一个task, 这对于WordCount来说非常关键,如果同一个单词不去同一个task,
那么统计出来的单词次数就不对了。
注:fields grouping是stream合并,是stream聚合以及很多其它场景的基础。在背后呢,
fields grouping使用的一致性哈希来分配tuple。
Storm支持的组分配策略如下:
1.ShuffleGrouping:随机选择一个Task来发送。
2.FiledGrouping:根据Tuple中Fields来做一致性hash,相同hash值的Tuple被发送到相同的Task。
3.AllGrouping:广播发送,将每一个Tuple发送到所有的Task。
4.GlobalGrouping:所有的Tuple会被发送到某个Bolt中的id最小的那个Task。
5.NoneGrouping:不关心Tuple发送给哪个Task来处理,等价于ShuffleGrouping。
6.DirectGrouping:直接将Tuple发送到指定的Task来处理。
7.Storm的组分配策略的概念类似于MapReduce的Partition机制,通过使用一些分组策略原语来为Tuple设置路由。
五 小结
Storm这种高可拓展性,能处理高频数据和大规模数据的实时流计算解决方案将被应用于实时搜索,高频交易和社交网络上,其应用场景将会越来越广泛。 |