您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Model Center   Code  
会员   
   
 
     
   
 订阅
  捐助
storm学习
 
   次浏览      
 2018-2-27 
 
编辑推荐:
本文来自于博客园,本文通过本地测试代码,简单的介绍了storm的学习,希望对您的学习有帮助。

1.HADOOP与STORM比较

数据来源:HADOOP处理的是HDFS上TB级别的数据(历史数据),STORM是处理的是实时新增的某一笔数据(实时数据),处理一些简单的业务逻辑;

处理过程:HADOOP是分MAP阶段到REDUCE阶段,STORM是由用户定义处理流程,流程中可以包含多个步骤,每个步骤可以是数据源(SPOUT)或处理逻辑(BOLT);

是否结束:HADOOP最后是要结束的,STORM是没有结束状态,到最后一步时,就停在那,直到有新数据进入时再从头开始,(SPOUT一直循环nextTuple()方法,BOLT当有接受到消息就调用execute(Tuple input)方法);

处理速度:HADOOP是以处理HDFS上TB级别数据为目的,处理速度慢,STORM是只要处理新增的某一笔数据即可,可以做到很快;

适用场景:HADOOP是在要处理批量数据时用的,不讲究时效性,STORM是要处理某一新增数据时用的,要讲时效性;

2.Storm的设计思想

Storm是对流Stream的抽象,流是一个不间断的无界的连续tuple,注意Storm在建模事件流时,把流中的事件抽象为tuple即元组。

Storm将流中元素抽象为Tuple,一个tuple就是一个值列表value list,list中的每个value都有一个name,并且该value可以是基本类型,字符类型,字节数组等,当然也可以是其他可序列化的类型。

Storm认为每个stream都有一个stream源,也就是原始元组的源头,所以它将这个源头称为Spout。

有了源头即spout也就是有了stream,那么该如何处理stream内的tuple呢。将流的状态转换称为Bolt,bolt可以消费任意数量的输入流,只要将流方向导向该bolt,同时它也可以发送新的流给其他bolt使用,这样一来,只要打开特定的spout(管口)再将spout中流出的tuple导向特定的bolt,又bolt对导入的流做处理后再导向其他bolt或者目的地。

以上处理过程统称为Topology即拓扑。拓扑是storm中最高层次的一个抽象概念,它可以被提交到storm集群执行,一个拓扑就是一个流转换图,图中每个节点是一个spout或者bolt,图中的边表示bolt订阅了哪些流,当spout或者bolt发送元组到流时,它就发送元组到每个订阅了该流的bolt(这就意味着不需要我们手工拉管道,只要预先订阅,spout就会将流发到适当bolt上)。

拓扑的每个节点都要说明它所发射出的元组的字段的name,其他节点只需要订阅该name就可以接收处理。

3.流处理过程

4.Storm的基础概念

Topology : 相当于一个业务流程项目,相当于hadoop中MapperReduce中的job

Stream:消息流,是一个没有边界的tuple序列,这些tuples会被以一种分布式的方式并行地创建和处理

tuple:就是数据的单位,需要每一个需要处理的数据的封装在tuple中

Spouts 消息源,是消息生产者,他会从一个外部源读取数据并向topology里面面发出消息:tuple

Bolts 消息处理者,所有的消息处理逻辑被封装在bolts里面,处理输入的数据流并产生新的输出数据流,可执行过滤,聚合,查询数据库等操作

Task 每一个Spout和Bolt会被当作很多task在整个集群里面执行,每一个task对应到一个线程.

Stream groupings 消息分发策略,定义一个Topology的其中一步是定义每个tuple接受什么样的流作为输入,stream grouping就是用来定义一个stream应该如何分配给Bolts们.

5.本地测试代码

package stormNew;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class LocalTopology {

public static void main(String[] args) {

//组装Topology
TopologyBuilder build = new TopologyBuilder();
//定义spout的id
build.setSpout("spout", new Spout());
//定义bolt的id,使用new Fields("field")字段进行分组
build.setBolt("bolt", new Bolt()).fieldsGrouping("spout", new Fields("field"));

try {
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalTopology", conf, build.createTopology());
// Config stormConf = new Config();
// stormConf.setNumWorkers(2);
// StormSubmitter.submitTopology("luluTology", stormConf,build.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
}

public static class Spout extends BaseRichSpout{

private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;

private int i;
private HashMap<Integer,Integer> map = new HashMap<Integer,Integer>();

//spout中循环这个方法,进行消息获取与发送
public void nextTuple() {
// System.err.println("Spout:"+i);
//作为每一条消息的唯一标识,用于ack的消息确认机制
int mgsid = i;
//发送tuple消息到bolt中处理
this.collector.emit(new Values(i++,i%3),mgsid);
//spout自身维护着消息与标识之间的关系
map.put(mgsid, i);
try {
//休眠一下,清晰看出效果
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.conf = conf;
this.context = context;
this.collector = collector;
}


public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//定义发送字段的名称
outputFieldsDeclarer.declare(new Fields("num","field"));
}

@Override
//当bolt调用的ack方面时,回调spout中的ack方法
public void ack(Object msgId) {
System.out.println("确认信息-----------");
}

@Override
//当bolt调用的fail方面时,回调spout中的fail方法
public void fail(Object msgId) {
System.out.println("消息失败-----------"+map.get(msgId));
}



}


public static class Bolt extends BaseRichBolt{


private Map conf;
private TopologyContext context;
private OutputCollector collector;

private int sum = 0;

//bolt接受动spout发送过来的消息就调execute
public void execute(Tuple input) {
// TODO Auto-generated method stub
//通过字段名称来获取数据
int num = input.getIntegerByField("num");
System.err.println("--------------------num:"+(num));

/**
* 为了更好的看书ack消息确认机制的效果,所以直接调用ack与fail方法
*
* 默认该方法是这样进行ack的调用
* try{
* 业务逻辑
* this.collector.ack(input);
*}catch(){
* this.collector.fail(input);
*}
*/
if(num >=10 && num <=20){
//System.err.println("sum:"+(sum+=num));
this.collector.ack(input);
}else{
this.collector.fail(input);
}
}

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
// TODO Auto-generated method stub
this.conf = conf;
this.context = context;
this.collector = collector;
}

public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
// TODO Auto-generated method stub

}

}

}

6.Storm集群结构

主节点(Nimbus):Nimbus负责在集群范围内分发代码、为worker分配任务和故障监测

从节点(Supervisor):Supervisor监听分配给它所在机器的工作,基于Nimbus分配给它的事情来决定启动或停止工作者进程。每个工作者进程执行一个topology的子集(也就是一个子拓扑结构);一个运行中的topology由许多跨多个机器的工作者进程组成。

启动集群

在nimbus节点执行"nohup bin/storm nimbus >/dev/null 2>&1 &"启动Nimbus后台程序,并放到后台执行

在supervisor节点执行"nohup bin/storm supervisor >/dev/null 2>&1 &"启动Supervisor后台程序,并放到后台执行;

在nimbus节点执行"nohup bin/storm ui >/dev/null 2>&1 &"启动UI后台程序,并放到后台执行,启动后可以通过http://{nimbus host}:8080观察集群的worker资源使用情况、Topologies的运行状态等信息。

在所有节点执行"nohup bin/storm logviewer >/dev/null 2>&1 &"启动log后台程序,并放到后台执行,启动后可以通过http://{host}:8000观察日志信息。(nimbus节点可以不用启动logviewer进程,因为logviewer进程主要是为了方便查看任务的执行日志,这些执行日志都在supervisor节点上。)

停止作业

先查询作业列表storm list

命令行下执行storm kill TopologyName

在storm ui上点击kill按钮

7.并行度

一个节点上最多能够运行四个worker,是四个slots

1个worker进程执行的是1个topology的子集(注:不会出现1个worker为多个topology服务)。1个worker进程会启动1个或多个executor线程来执行1个topology的component(spout或bolt)。因此,1个运行中的topology就是由集群中多台物理机上的多个worker进程组成的。

executor是1个被worker进程启动的单独线程。每个executor只会运行1个topology的1个component(spout或bolt)的task(注:task可以是1个或多个,storm默认是1个component只生成1个task,executor线程会在每次循环里顺序调用所有task实例)。

task是最终运行spout或bolt中代码的执行单元(注:1个task即为spout或bolt的1个实例,executor线程在执行期间会调用该task的nextTuple或execute方法)。topology启动后,1个component(spout或bolt)的task数目是固定不变的,但该component使用的executor线程数可以动态调整(例如:1个executor线程可以执行该component的1个或多个task实例)。这意味着,对于1个component存在这样的条件:#threads<=#tasks(即:线程数小于等于task数目)。默认情况下task的数目等于executor线程数目,即1个executor线程只运行1个task。

默认情况下,一个supervisor节点会启动4个worker进程。每个worker进程会启动1个executor,每个executor启动1个task

提高并行度

worker(slots)

默认一个从节点上面可以启动4个worker进程,参数是supervisor.slots.port。在storm配置文件中已经配置过了,默认是在strom-core.jar包中的defaults.yaml中配置的有。

默认一个strom项目只使用一个worker进程,可以通过代码来设置使用多个worker进程。

通过config.setNumWorkers(workers)设置

通过conf.setNumAckers(0);可以取消acker任务

最好一台机器上的一个topology只使用一个worker,主要原因是减少了worker之间的数据传输

如果worker使用完的话再提交topology就不会执行,会处于等待状态

executor

默认情况下一个executor运行一个task,可以通过在代码中设置

builder.setSpout(id, spout, parallelism_hint);

builder.setBolt(id, bolt, parallelism_hint);

task

通过boltDeclarer.setNumTasks(num);来设置实例的个数

executor的数量会小于等于task的数量(为了rebalance)

弹性计算

通过代码调整

通过代码调整
topologyBuilder.setBolt("green-bolt", new GreenBolt(),2)
.setNumTasks(4).shuffleGrouping("blue-spout);
通过shell调整
# 10秒之后开始调整
# Reconfigure the topology "mytopology" to use 5 worker processes,
# the spout "blue-spout" to use 3 executors and
# the bolt "yellow-bolt" to use 10 executors.
storm rebalance mytopology -w 10 -n 5 -e blue-spout=3 -e yellow-bolt=10

-w 代表几秒后开始执行

-n 代表几个worker

-e 代表几个excutor

stream grouping分类

Shuffle Grouping: 随机分组, 随机派发stream里面的tuple, 保证bolt中的每个任务接收到的tuple数目相同.(它能实现较好的负载均衡)

Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到同一任务, 而不同的userid则会被分配到不同的任务

All Grouping: 广播发送,对于每一个tuple,Bolts中的所有任务都会收到.

   
次浏览       
相关文章

基于EA的数据库建模
数据流建模(EA指南)
“数据湖”:概念、特征、架构与案例
在线商城数据库系统设计 思路+效果
 
相关文档

Greenplum数据库基础培训
MySQL5.1性能优化方案
某电商数据中台架构实践
MySQL高扩展架构设计
相关课程

数据治理、数据架构及数据标准
MongoDB实战课程
并发、大容量、高性能数据库设计与优化
PostgreSQL数据库实战培训