编辑推荐: |
本文来自于博客园,本文通过本地测试代码,简单的介绍了storm的学习,希望对您的学习有帮助。
|
|
1.HADOOP与STORM比较
数据来源:HADOOP处理的是HDFS上TB级别的数据(历史数据),STORM是处理的是实时新增的某一笔数据(实时数据),处理一些简单的业务逻辑;
处理过程:HADOOP是分MAP阶段到REDUCE阶段,STORM是由用户定义处理流程,流程中可以包含多个步骤,每个步骤可以是数据源(SPOUT)或处理逻辑(BOLT);
是否结束:HADOOP最后是要结束的,STORM是没有结束状态,到最后一步时,就停在那,直到有新数据进入时再从头开始,(SPOUT一直循环nextTuple()方法,BOLT当有接受到消息就调用execute(Tuple
input)方法);
处理速度:HADOOP是以处理HDFS上TB级别数据为目的,处理速度慢,STORM是只要处理新增的某一笔数据即可,可以做到很快;
适用场景:HADOOP是在要处理批量数据时用的,不讲究时效性,STORM是要处理某一新增数据时用的,要讲时效性;
2.Storm的设计思想
Storm是对流Stream的抽象,流是一个不间断的无界的连续tuple,注意Storm在建模事件流时,把流中的事件抽象为tuple即元组。
Storm将流中元素抽象为Tuple,一个tuple就是一个值列表value list,list中的每个value都有一个name,并且该value可以是基本类型,字符类型,字节数组等,当然也可以是其他可序列化的类型。
Storm认为每个stream都有一个stream源,也就是原始元组的源头,所以它将这个源头称为Spout。
有了源头即spout也就是有了stream,那么该如何处理stream内的tuple呢。将流的状态转换称为Bolt,bolt可以消费任意数量的输入流,只要将流方向导向该bolt,同时它也可以发送新的流给其他bolt使用,这样一来,只要打开特定的spout(管口)再将spout中流出的tuple导向特定的bolt,又bolt对导入的流做处理后再导向其他bolt或者目的地。
以上处理过程统称为Topology即拓扑。拓扑是storm中最高层次的一个抽象概念,它可以被提交到storm集群执行,一个拓扑就是一个流转换图,图中每个节点是一个spout或者bolt,图中的边表示bolt订阅了哪些流,当spout或者bolt发送元组到流时,它就发送元组到每个订阅了该流的bolt(这就意味着不需要我们手工拉管道,只要预先订阅,spout就会将流发到适当bolt上)。
拓扑的每个节点都要说明它所发射出的元组的字段的name,其他节点只需要订阅该name就可以接收处理。
3.流处理过程
4.Storm的基础概念
Topology : 相当于一个业务流程项目,相当于hadoop中MapperReduce中的job
Stream:消息流,是一个没有边界的tuple序列,这些tuples会被以一种分布式的方式并行地创建和处理
tuple:就是数据的单位,需要每一个需要处理的数据的封装在tuple中
Spouts 消息源,是消息生产者,他会从一个外部源读取数据并向topology里面面发出消息:tuple
Bolts 消息处理者,所有的消息处理逻辑被封装在bolts里面,处理输入的数据流并产生新的输出数据流,可执行过滤,聚合,查询数据库等操作
Task 每一个Spout和Bolt会被当作很多task在整个集群里面执行,每一个task对应到一个线程.
Stream groupings 消息分发策略,定义一个Topology的其中一步是定义每个tuple接受什么样的流作为输入,stream
grouping就是用来定义一个stream应该如何分配给Bolts们.
5.本地测试代码
package
stormNew;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class LocalTopology {
public static void main(String[] args) {
//组装Topology
TopologyBuilder build = new TopologyBuilder();
//定义spout的id
build.setSpout("spout", new Spout());
//定义bolt的id,使用new Fields("field")字段进行分组
build.setBolt("bolt", new Bolt()).fieldsGrouping("spout",
new Fields("field"));
try {
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalTopology",
conf, build.createTopology());
// Config stormConf = new Config();
// stormConf.setNumWorkers(2);
// StormSubmitter.submitTopology("luluTology",
stormConf,build.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
}
public static class Spout extends BaseRichSpout{
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;
private int i;
private HashMap<Integer,Integer> map =
new HashMap<Integer,Integer>();
//spout中循环这个方法,进行消息获取与发送
public void nextTuple() {
// System.err.println("Spout:"+i);
//作为每一条消息的唯一标识,用于ack的消息确认机制
int mgsid = i;
//发送tuple消息到bolt中处理
this.collector.emit(new Values(i++,i%3),mgsid);
//spout自身维护着消息与标识之间的关系
map.put(mgsid, i);
try {
//休眠一下,清晰看出效果
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.conf = conf;
this.context = context;
this.collector = collector;
}
public void declareOutputFields(OutputFieldsDeclarer
outputFieldsDeclarer) {
//定义发送字段的名称
outputFieldsDeclarer.declare(new Fields("num","field"));
}
@Override
//当bolt调用的ack方面时,回调spout中的ack方法
public void ack(Object msgId) {
System.out.println("确认信息-----------");
}
@Override
//当bolt调用的fail方面时,回调spout中的fail方法
public void fail(Object msgId) {
System.out.println("消息失败-----------"+map.get(msgId));
}
}
public static class Bolt extends BaseRichBolt{
private Map conf;
private TopologyContext context;
private OutputCollector collector;
private int sum = 0;
//bolt接受动spout发送过来的消息就调execute
public void execute(Tuple input) {
// TODO Auto-generated method stub
//通过字段名称来获取数据
int num = input.getIntegerByField("num");
System.err.println("--------------------num:"+(num));
/**
* 为了更好的看书ack消息确认机制的效果,所以直接调用ack与fail方法
*
* 默认该方法是这样进行ack的调用
* try{
* 业务逻辑
* this.collector.ack(input);
*}catch(){
* this.collector.fail(input);
*}
*/
if(num >=10 && num <=20){
//System.err.println("sum:"+(sum+=num));
this.collector.ack(input);
}else{
this.collector.fail(input);
}
}
public void prepare(Map conf, TopologyContext
context, OutputCollector collector) {
// TODO Auto-generated method stub
this.conf = conf;
this.context = context;
this.collector = collector;
}
public void declareOutputFields(OutputFieldsDeclarer
outputFieldsDeclarer) {
// TODO Auto-generated method stub
}
}
} |
6.Storm集群结构
主节点(Nimbus):Nimbus负责在集群范围内分发代码、为worker分配任务和故障监测
从节点(Supervisor):Supervisor监听分配给它所在机器的工作,基于Nimbus分配给它的事情来决定启动或停止工作者进程。每个工作者进程执行一个topology的子集(也就是一个子拓扑结构);一个运行中的topology由许多跨多个机器的工作者进程组成。
启动集群
在nimbus节点执行"nohup bin/storm nimbus >/dev/null
2>&1 &"启动Nimbus后台程序,并放到后台执行
在supervisor节点执行"nohup bin/storm supervisor >/dev/null
2>&1 &"启动Supervisor后台程序,并放到后台执行;
在nimbus节点执行"nohup bin/storm ui >/dev/null
2>&1 &"启动UI后台程序,并放到后台执行,启动后可以通过http://{nimbus
host}:8080观察集群的worker资源使用情况、Topologies的运行状态等信息。
在所有节点执行"nohup bin/storm logviewer >/dev/null
2>&1 &"启动log后台程序,并放到后台执行,启动后可以通过http://{host}:8000观察日志信息。(nimbus节点可以不用启动logviewer进程,因为logviewer进程主要是为了方便查看任务的执行日志,这些执行日志都在supervisor节点上。)
停止作业
先查询作业列表storm list
命令行下执行storm kill TopologyName
在storm ui上点击kill按钮
7.并行度
一个节点上最多能够运行四个worker,是四个slots
1个worker进程执行的是1个topology的子集(注:不会出现1个worker为多个topology服务)。1个worker进程会启动1个或多个executor线程来执行1个topology的component(spout或bolt)。因此,1个运行中的topology就是由集群中多台物理机上的多个worker进程组成的。
executor是1个被worker进程启动的单独线程。每个executor只会运行1个topology的1个component(spout或bolt)的task(注:task可以是1个或多个,storm默认是1个component只生成1个task,executor线程会在每次循环里顺序调用所有task实例)。
task是最终运行spout或bolt中代码的执行单元(注:1个task即为spout或bolt的1个实例,executor线程在执行期间会调用该task的nextTuple或execute方法)。topology启动后,1个component(spout或bolt)的task数目是固定不变的,但该component使用的executor线程数可以动态调整(例如:1个executor线程可以执行该component的1个或多个task实例)。这意味着,对于1个component存在这样的条件:#threads<=#tasks(即:线程数小于等于task数目)。默认情况下task的数目等于executor线程数目,即1个executor线程只运行1个task。
默认情况下,一个supervisor节点会启动4个worker进程。每个worker进程会启动1个executor,每个executor启动1个task
提高并行度
worker(slots)
默认一个从节点上面可以启动4个worker进程,参数是supervisor.slots.port。在storm配置文件中已经配置过了,默认是在strom-core.jar包中的defaults.yaml中配置的有。
默认一个strom项目只使用一个worker进程,可以通过代码来设置使用多个worker进程。
通过config.setNumWorkers(workers)设置
通过conf.setNumAckers(0);可以取消acker任务
最好一台机器上的一个topology只使用一个worker,主要原因是减少了worker之间的数据传输
如果worker使用完的话再提交topology就不会执行,会处于等待状态
executor
默认情况下一个executor运行一个task,可以通过在代码中设置
builder.setSpout(id, spout, parallelism_hint);
builder.setBolt(id, bolt, parallelism_hint);
task
通过boltDeclarer.setNumTasks(num);来设置实例的个数
executor的数量会小于等于task的数量(为了rebalance)
弹性计算
通过代码调整
通过代码调整
topologyBuilder.setBolt("green-bolt",
new GreenBolt(),2)
.setNumTasks(4).shuffleGrouping("blue-spout);
通过shell调整
# 10秒之后开始调整
# Reconfigure the topology "mytopology"
to use 5 worker processes,
# the spout "blue-spout" to use 3
executors and
# the bolt "yellow-bolt" to use 10
executors.
storm rebalance mytopology -w 10 -n 5 -e blue-spout=3
-e yellow-bolt=10 |
-w 代表几秒后开始执行
-n 代表几个worker
-e 代表几个excutor
stream grouping分类
Shuffle Grouping: 随机分组, 随机派发stream里面的tuple, 保证bolt中的每个任务接收到的tuple数目相同.(它能实现较好的负载均衡)
Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到同一任务,
而不同的userid则会被分配到不同的任务
All Grouping: 广播发送,对于每一个tuple,Bolts中的所有任务都会收到. |