求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Modeler   Code  
会员   
 
  
 
 
     
   
分享到
使用Storm实现实时大数据分析!
 

作者:Shruthi Kumar、Siddharth Patankar ,发布于2012-12-28,来源:Dr.Dobb's

 

简单和明了,Storm让大数据分析变得轻松加愉快。

当今世界,公司的日常运营经常会生成TB级别的数据。数据来源囊括了互联网装置可以捕获的任何类型数据,网站、社交媒体、交易型商业数据以及其它商业环境中创建的数据。考虑到数据的生成量,实时处理成为了许多机构需要面对的首要挑战。我们经常用的一个非常有效的开源实时计算工具就是Storm —— Twitter开发,通常被比作“实时的Hadoop”。然而Storm远比Hadoop来的简单,因为用它处理大数据不会带来新老技术的交替。

Shruthi Kumar、Siddharth Patankar共同效力于Infosys,分别从事技术分析和研发工作。本文详述了Storm的使用方法,例子中的项目名称为“超速报警系统(Speeding Alert System)”。我们想实现的功能是:实时分析过往车辆的数据,一旦车辆数据超过预设的临界值 —— 便触发一个trigger并把相关的数据存入数据库。

Storm

对比Hadoop的批处理,Storm是个实时的、分布式以及具备高容错的计算系统。同Hadoop一样Storm也可以处理大批量的数据,然而Storm在保证高可靠性的前提下还可以让处理进行的更加实时;也就是说,所有的信息都会被处理。Storm同样还具备容错和分布计算这些特性,这就让Storm可以扩展到不同的机器上进行大批量的数据处理。他同样还有以下的这些特性:

  • 易于扩展。对于扩展,你只需要添加机器和改变对应的topology(拓扑)设置。Storm使用Hadoop Zookeeper进行集群协调,这样可以充分的保证大型集群的良好运行。
  • 每条信息的处理都可以得到保证。
  • Storm集群管理简易。
  • Storm的容错机能:一旦topology递交,Storm会一直运行它直到topology被废除或者被关闭。而在执行中出现错误时,也会由Storm重新分配任务。
  • 尽管通常使用Java,Storm中的topology可以用任何语言设计。

当然为了更好的理解文章,你首先需要安装和设置Storm。需要通过以下几个简单的步骤:

  • 从Storm官方下载Storm安装文件
  • 将bin/directory解压到你的PATH上,并保证bin/storm脚本是可执行的。

Storm组件

Storm集群主要由一个主节点和一群工作节点(worker node)组成,通过 Zookeeper进行协调。

主节点:

主节点通常运行一个后台程序 —— Nimbus,用于响应分布在集群中的节点,分配任务和监测故障。这个很类似于Hadoop中的Job Tracker。

工作节点:

工作节点同样会运行一个后台程序 —— Supervisor,用于收听工作指派并基于要求运行工作进程。每个工作节点都是topology中一个子集的实现。而Nimbus和Supervisor之间的协调则通过Zookeeper系统或者集群。

Zookeeper

Zookeeper是完成Supervisor和Nimbus之间协调的服务。而应用程序实现实时的逻辑则被封装进Storm中的“topology”。topology则是一组由Spouts(数据源)和Bolts(数据操作)通过Stream Groupings进行连接的图。下面对出现的术语进行更深刻的解析。

Spout:

简而言之,Spout从来源处读取数据并放入topology。Spout分成可靠和不可靠两种;当Storm接收失败时,可靠的Spout会对tuple(元组,数据项组成的列表)进行重发;而不可靠的Spout不会考虑接收成功与否只发射一次。而Spout中最主要的方法就是nextTuple(),该方法会发射一个新的tuple到topology,如果没有新tuple发射则会简单的返回。

Bolt:

Topology中所有的处理都由Bolt完成。Bolt可以完成任何事,比如:连接的过滤、聚合、访问文件/数据库、等等。Bolt从Spout中接收数据并进行处理,如果遇到复杂流的处理也可能将tuple发送给另一个Bolt进行处理。而Bolt中最重要的方法是execute(),以新的tuple作为参数接收。不管是Spout还是Bolt,如果将tuple发射成多个流,这些流都可以通过declareStream()来声明。

Stream Groupings:

Stream Grouping定义了一个流在Bolt任务间该如何被切分。这里有Storm提供的6个Stream Grouping类型:

  1. 随机分组(Shuffle grouping):随机分发tuple到Bolt的任务,保证每个任务获得相等数量的tuple。
  2. 字段分组(Fields grouping):根据指定字段分割数据流,并分组。例如,根据“user-id”字段,相同“user-id”的元组总是分发到同一个任务,不同“user-id”的元组可能分发到不同的任务。
  3. 全部分组(All grouping):tuple被复制到bolt的所有任务。这种类型需要谨慎使用。
  4. 全局分组(Global grouping):全部流都分配到bolt的同一个任务。明确地说,是分配给ID最小的那个task。
  5. 无分组(None grouping):你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到Bolts或Spouts订阅它们的同一线程去执行(如果可能)。
  6. 直接分组(Direct grouping):这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务接收。

当然还可以实现CustomStreamGroupimg接口来定制自己需要的分组。

项目实施

当下情况我们需要给Spout和Bolt设计一种能够处理大量数据(日志文件)的topology,当一个特定数据值超过预设的临界值时促发警报。使用Storm的topology,逐行读入日志文件并且监视输入数据。在Storm组件方面,Spout负责读入输入数据。它不仅从现有的文件中读入数据,同时还监视着新文件。文件一旦被修改Spout会读入新的版本并且覆盖之前的tuple(可以被Bolt读入的格式),将tuple发射给Bolt进行临界分析,这样就可以发现所有可能超临界的记录。

下一节将对用例进行详细介绍。

临界分析

这一节,将主要聚焦于临界值的两种分析类型:瞬间临界(instant thershold)和时间序列临界(time series threshold)。

  • 瞬间临界值监测:一个字段的值在那个瞬间超过了预设的临界值,如果条件符合的话则触发一个trigger。举个例子当车辆超越80公里每小时,则触发trigger。
  • 时间序列临界监测:字段的值在一个给定的时间段内超过了预设的临界值,如果条件符合则触发一个触发器。比如:在5分钟类,时速超过80KM两次及以上的车辆。

Listing One显示了我们将使用的一个类型日志,其中包含的车辆数据信息有:车牌号、车辆行驶的速度以及数据获取的位置。

车牌号
车辆行驶的速度
数据获取的位置
AB 123
60
North city
BC 123
70
South city
CD 234
40
South city
DE 123
40
East city
EF 123
90
South city
GH 123
50
West city

这里将创建一个对应的XML文件,这将包含引入数据的模式。这个XML将用于日志文件的解析。XML的设计模式和对应的说明请见下表。

XML文件和日志文件都存放在Spout可以随时监测的目录下,用以关注文件的实时更新。而这个用例中的topology请见下图。

Figure 1:Storm中建立的topology,用以实现数据实时处理

如图所示:FilelistenerSpout接收输入日志并进行逐行的读入,接着将数据发射给ThresoldCalculatorBolt进行更深一步的临界值处理。一旦处理完成,被计算行的数据将发送给DBWriterBolt,然后由DBWriterBolt存入给数据库。下面将对这个过程的实现进行详细的解析。

Spout的实现

Spout以日志文件和XML描述文件作为接收对象。XML文件包含了与日志一致的设计模式。不妨设想一下一个示例日志文件,包含了车辆的车牌号、行驶速度、以及数据的捕获位置。(看下图)

Figure2:数据从日志文件到Spout的流程图

Listing Two显示了tuple对应的XML,其中指定了字段、将日志文件切割成字段的定界符以及字段的类型。XML文件以及数据都被保存到Spout指定的路径。

Listing Two:用以描述日志文件的XML文件。

  <TUPLEINFO> 
  <FIELDLIST> 
  <FIELD> 
  <COLUMNNAME>vehicle_number</COLUMNNAME> 
  <COLUMNTYPE>string</COLUMNTYPE> 
  </FIELD> 
   
  <FIELD>
  <COLUMNNAME>speed</COLUMNNAME> 
  <COLUMNTYPE>int</COLUMNTYPE> 
  </FIELD> 
   
  <FIELD> 
  <COLUMNNAME>location</COLUMNNAME> 
  <COLUMNTYPE>string</COLUMNTYPE> 
  </FIELD> 
  </FIELDLIST> 
  <DELIMITER>,</DELIMITER> 
  </TUPLEINFO> 

通过构造函数及它的参数Directory、PathSpout和TupleInfo对象创建Spout对象。TupleInfo储存了日志文件的字段、定界符、字段的类型这些很必要的信息。这个对象通过XSTream序列化XML时建立。

Spout的实现步骤:

  • 对文件的改变进行分开的监听,并监视目录下有无新日志文件添加。
  • 在数据得到了字段的说明后,将其转换成tuple。
  • 声明Spout和Bolt之间的分组,并决定tuple发送给Bolt的途径。

Spout的具体编码在Listing Three中显示。

Listing Three:Spout中open、nextTuple和delcareOutputFields方法的逻辑。

  public void open( Map conf, TopologyContext context,SpoutOutputCollector collector ) 
  {   
      _collector = collector;   
      try   
      {   
         fileReader  =  new BufferedReader(new FileReader(new File(file)));  
       }  
       catch (FileNotFoundException e)  
       {  
          System.exit(1);   
       }  
  }                                                          
   
  public void nextTuple()  
  {  
      protected void ListenFile(File file)  
      {  
         Utils.sleep(2000);  
         RandomAccessFile access = null;  
         String line = null;   
         try   
         {  
             while ((line = access.readLine()) != null)  
             {  
                if (line !=null)  
                {   
                   String[] fields=null;  
                    if (tupleInfo.getDelimiter().equals("|"))  fields = line.split("\\"+tupleInfo.getDelimiter()); 
                    else   
                    fields = line.split  (tupleInfo.getDelimiter());   
                    if (tupleInfo.getFieldList().size() == fields.length)  _collector.emit(new Values(fields));
                 }  
              }  
           }  
           catch (IOException ex){ }  
      }  
  }  
   
  public void declareOutputFields(OutputFieldsDeclarer declarer)  
  {  
     String[] fieldsArr = new String [tupleInfo.getFieldList().size()];  
     for(int i=0; i<tupleInfo.getFieldList().size(); i++)  
     {  
         fieldsArr[i] = tupleInfo.getFieldList().get(i).getColumnName();  
      }  
  declarer.declare(new Fields(fieldsArr));  
  } 

declareOutputFileds()决定了tuple发射的格式,这样的话Bolt就可以用类似的方法将tuple译码。Spout持续对日志文件的数据的变更进行监听,一旦有添加Spout就会进行读入并且发送给Bolt进行处理。

Bolt的实现

Spout的输出结果将给予Bolt进行更深一步的处理。经过对用例的思考,我们的topology中需要如Figure 3中的两个Bolt。

Figure 3:Spout到Bolt的数据流程。

ThresholdCalculatorBolt

Spout将tuple发出,由ThresholdCalculatorBolt接收并进行临界值处理。在这里,它将接收好几项输入进行检查;分别是:

临界值检查

  • 临界值栏数检查(拆分成字段的数目)
  • 临界值数据类型(拆分后字段的类型)
  • 临界值出现的频数
  • 临界值时间段检查

Listing Four中的类,定义用来保存这些值。

Listing Four:ThresholdInfo类

  public class ThresholdInfo implementsSerializable  
   
  {    
          private String action;   
          private String rule;   
          private Object thresholdValue;  
          private int thresholdColNumber;   
          private Integer timeWindow;   
          private int frequencyOfOccurence;   
  }   

基于字段中提供的值,临界值检查将被Listing Five中的execute()方法执行。代码大部分的功能是解析和接收值的检测。

Listing Five:临界值检测代码段

  1.public void execute(Tuple tuple, BasicOutputCollector collector)   
  2.{  
  3.    if(tuple!=null)   
  4.    {  
  5.        List<Object> inputTupleList = (List<Object>) tuple.getValues();  
  6.        int thresholdColNum = thresholdInfo.getThresholdColNumber();   
  7.        Object thresholdValue = thresholdInfo.getThresholdValue();   
  8.        String thresholdDataType = tupleInfo.getFieldList().get(thresholdColNum-1).getColumnType();   
  9.        Integer timeWindow = thresholdInfo.getTimeWindow();  
 10.         int frequency = thresholdInfo.getFrequencyOfOccurence();  
 11.         if(thresholdDataType.equalsIgnoreCase("string"))  
 12.         {  
 13.             String valueToCheck = inputTupleList.get(thresholdColNum-1).toString();  
 14.             String frequencyChkOp = thresholdInfo.getAction();  
 15.             if(timeWindow!=null)  
 16.             {  
 17.                 long curTime = System.currentTimeMillis();  
 18.                 long diffInMinutes = (curTime-startTime)/(1000);  
 19.                 if(diffInMinutes>=timeWindow)  
 20.                 {  
 21.                     if(frequencyChkOp.equals("=="))  
 22.                     {  
 23.                          if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))  
 24.                          {  
 25.                              count.incrementAndGet();  
 26.                              if(count.get() > frequency)  
 27.                                  splitAndEmit(inputTupleList,collector);  
 28.                          }  
 29.                     }  
 30.                     else if(frequencyChkOp.equals("!="))  
 31.                     {  
 32.                         if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))  
 33.                         {  
 34.                              count.incrementAndGet();  
 35.                              if(count.get() > frequency)  
 36.                                  splitAndEmit(inputTupleList,collector);  
 37.                          }  
 38.                      }  
 39.                      else                         System.out.println("Operator not supported");   
 40.                  }  
 41.              }  
 42.              else 
 43.              {  
 44.                  if(frequencyChkOp.equals("=="))  
 45.                  {  
 46.                      if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))  
 47.                      {  
 48.                          count.incrementAndGet();  
 49.                          if(count.get() > frequency)  
 50.                              splitAndEmit(inputTupleList,collector);  
 51.                          }  
 52.                  }  
 53.                  else if(frequencyChkOp.equals("!="))  
 54.                  {  
 55.                       if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))  
 56.                       {  
 57.                           count.incrementAndGet();  
 58.                           if(count.get() > frequency)  
 59.                               splitAndEmit(inputTupleList,collector);  
 60.                          }  
 61.                   }  
 62.               }  
 63.            }  
 64.            else if(thresholdDataType.equalsIgnoreCase("int") ||                     
     thresholdDataType.equalsIgnoreCase("double") ||                     
     thresholdDataType.equalsIgnoreCase("float") ||                     
     thresholdDataType.equalsIgnoreCase("long") ||                     
     thresholdDataType.equalsIgnoreCase("short"))  
 65.            {  
 66.                String frequencyChkOp = thresholdInfo.getAction();  
 67.                if(timeWindow!=null)  
 68.                {  
 69.                   long valueToCheck = Long.parseLong(inputTupleList.get(thresholdColNum-1).toString());  
 70.                   long curTime = System.currentTimeMillis();  
 71.                   long diffInMinutes = (curTime-startTime)/(1000);  
 72.                   System.out.println("Difference in minutes="+diffInMinutes);  
 73.                   if(diffInMinutes>=timeWindow)  
 74.                   {  
 75.                          if(frequencyChkOp.equals("<"))  
 76.                          {  
 77.                              if(valueToCheck < Double.parseDouble(thresholdValue.toString()))  
 78.                              {  
 79.                                   count.incrementAndGet();  
 80.                                   if(count.get() > frequency)  
 81.                                       splitAndEmit(inputTupleList,collector);  
 82.                              }  
 83.                          }  
 84.                          else if(frequencyChkOp.equals(">"))  
 85.                          {  
 86.                               if(valueToCheck > Double.parseDouble(thresholdValue.toString()))  
 87.                                {  
 88.                                   count.incrementAndGet();  
 89.                                   if(count.get() > frequency)  
 90.                                       splitAndEmit(inputTupleList,collector);  
 91.                               }  
 92.                           }  
 93.                           else if(frequencyChkOp.equals("=="))  
 94.                           {  
 95.                              if(valueToCheck == Double.parseDouble(thresholdValue.toString()))  
 96.                              {  
 97.                                  count.incrementAndGet();  
 98.                                  if(count.get() > frequency)  
 99.                                      splitAndEmit(inputTupleList,collector);  
100.                               }  
101.                           }  
102.                           else if(frequencyChkOp.equals("!="))  
103.                           {  
104.    . . .  
105.                            }  
106.                       }  
107.             }  
108.      else 
109.          splitAndEmit(null,collector);  
110.      }  
111.      else 
112.     {  
113.           System.err.println("Emitting null in bolt");  
114.           splitAndEmit(null,collector);  
115.    }  
116.} 

经由Bolt发送的的tuple将会传递到下一个对应的Bolt,在我们的用例中是DBWriterBolt。

DBWriterBolt

经过处理的tuple必须被持久化以便于触发tigger或者更深层次的使用。DBWiterBolt做了这个持久化的工作并把tuple存入了数据库。表的建立由prepare()函数完成,这也将是topology调用的第一个方法。方法的编码如Listing Six所示。

Listing Six:建表编码。

  1.public void prepare( Map StormConf, TopologyContext context )   
  2.{         
  3.    try   
  4.    {  
  5.        Class.forName(dbClass);  
  6.    }   
  7.    catch (ClassNotFoundException e)   
  8.    {  
  9.        System.out.println("Driver not found");  
 10.        e.printStackTrace();  
 11.    }  
 12.   
 13.    try   
 14.    {  
 15.       connection driverManager.getConnection(   
 16.           "jdbc:mysql://"+databaseIP+":"+databasePort+"/"+databaseName, userName, pwd);  
 17.       connection.prepareStatement("DROP TABLE IF EXISTS "+tableName).execute();  
 18.   
 19.       StringBuilder createQuery = new StringBuilder(  
 20.           "CREATE TABLE IF NOT EXISTS "+tableName+"(");  
 21.       for(Field fields : tupleInfo.getFieldList())  
 22.       {  
 23.           if(fields.getColumnType().equalsIgnoreCase("String"))  
 24.               createQuery.append(fields.getColumnName()+" VARCHAR(500),");  
 25.           else 
 26.               createQuery.append(fields.getColumnName()+" "+fields.getColumnType()+",");  
 27.       }  
 28.       createQuery.append("thresholdTimeStamp timestamp)");  
 29.       connection.prepareStatement(createQuery.toString()).execute();  
 30.   
 31.       // Insert Query  
 32.       StringBuilder insertQuery = new StringBuilder("INSERT INTO "+tableName+"(");  
 33.       String tempCreateQuery = new String();  
 34.       for(Field fields : tupleInfo.getFieldList())  
 35.       {  
 36.            insertQuery.append(fields.getColumnName()+",");  
 37.       }  
 38.       insertQuery.append("thresholdTimeStamp").append(") values (");  
 39.       for(Field fields : tupleInfo.getFieldList())  
 40.       {  
 41.           insertQuery.append("?,");  
 42.       }  
 43.   
 44.       insertQuery.append("?)");  
 45.       prepStatement = connection.prepareStatement(insertQuery.toString());  
 46.    }  
 47.    catch (SQLException e)   
 48.    {         
 49.        e.printStackTrace();  
 50.    }         
 51.}

数据分批次的插入数据库。插入的逻辑由Listting Seven中的execute()方法提供。大部分的编码都是用来实现可能存在不同类型输入的解析。

Listing Seven:数据插入的代码部分。

  1.public void execute(Tuple tuple, BasicOutputCollector collector)   
  2.{  
  3.    batchExecuted=false;  
  4.    if(tuple!=null)  
  5.    {  
  6.       List<Object> inputTupleList = (List<Object>) tuple.getValues();  
  7.       int dbIndex=0;  
  8.       for(int i=0;i<tupleInfo.getFieldList().size();i++)  
  9.       {  
 10.           Field field = tupleInfo.getFieldList().get(i);  
 11.           try {  
 12.               dbIndex = i+1;  
 13.               if(field.getColumnType().equalsIgnoreCase("String"))               
 14.                   prepStatement.setString(dbIndex, inputTupleList.get(i).toString());  
 15.               else if(field.getColumnType().equalsIgnoreCase("int"))  
 16.                   prepStatement.setInt(dbIndex,  
 17.                       Integer.parseInt(inputTupleList.get(i).toString()));  
 18.               else if(field.getColumnType().equalsIgnoreCase("long"))  
 19.                   prepStatement.setLong(dbIndex,   
 20.                       Long.parseLong(inputTupleList.get(i).toString()));  
 21.               else if(field.getColumnType().equalsIgnoreCase("float"))  
 22.                   prepStatement.setFloat(dbIndex,   
 23.                       Float.parseFloat(inputTupleList.get(i).toString()));  
 24.               else if(field.getColumnType().equalsIgnoreCase("double"))  
 25.                   prepStatement.setDouble(dbIndex,   
 26.                       Double.parseDouble(inputTupleList.get(i).toString()));  
 27.               else if(field.getColumnType().equalsIgnoreCase("short"))  
 28.                   prepStatement.setShort(dbIndex,   
 29.                       Short.parseShort(inputTupleList.get(i).toString()));  
 30.               else if(field.getColumnType().equalsIgnoreCase("boolean"))  
 31.                   prepStatement.setBoolean(dbIndex,   
 32.                       Boolean.parseBoolean(inputTupleList.get(i).toString()));  
 33.               else if(field.getColumnType().equalsIgnoreCase("byte"))  
 34.                   prepStatement.setByte(dbIndex,   
 35.                       Byte.parseByte(inputTupleList.get(i).toString()));  
 36.               else if(field.getColumnType().equalsIgnoreCase("Date"))  
 37.               {  
 38.                  Date dateToAdd=null;  
 39.                  if (!(inputTupleList.get(i) instanceof Date))    
 40.                  {    
 41.                       DateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");  
 42.                       try   
 43.                       {  
 44.                           dateToAdd = df.parse(inputTupleList.get(i).toString());  
 45.                       }  
 46.                       catch (ParseException e)   
 47.                       {  
 48.                           System.err.println("Data type not valid");  
 49.                       }  
 50.                   }    
 51.                   else 
 52.                   {  
 53.            dateToAdd = (Date)inputTupleList.get(i);  
 54.            java.sql.Date sqlDate = new java.sql.Date(dateToAdd.getTime());  
 55.            prepStatement.setDate(dbIndex, sqlDate);  
 56.            }     
 57.            }   
 58.        catch (SQLException e)   
 59.        {  
 60.             e.printStackTrace();  
 61.        }  
 62.    }  
 63.    Date now = new Date();            
 64.    try 
 65.    {  
 66.        prepStatement.setTimestamp(dbIndex+1, new java.sql.Timestamp(now.getTime()));  
 67.        prepStatement.addBatch();  
 68.        counter.incrementAndGet();  
 69.        if (counter.get()== batchSize)   
 70.        executeBatch();  
 71.    }   
 72.    catch (SQLException e1)   
 73.    {  
 74.        e1.printStackTrace();  
 75.    }             
 76.   }  
 77.   else 
 78.   {  
 79.        long curTime = System.currentTimeMillis();  
 80.       long diffInSeconds = (curTime-startTime)/(60*1000);  
 81.       if(counter.get()<batchSize && diffInSeconds>batchTimeWindowInSeconds)  
 82.       {  
 83.            try {  
 84.                executeBatch();  
 85.                startTime = System.currentTimeMillis();  
 86.            }  
 87.            catch (SQLException e) {  
 88.                 e.printStackTrace();  
 89.            }  
 90.       }  
 91.   }  
 92.}  
 93.   
 94.public void executeBatch() throws SQLException  
 95.{  
 96.    batchExecuted=true;  
 97.    prepStatement.executeBatch();  
 98.    counter = new AtomicInteger(0);  
 99.} 

一旦Spout和Bolt准备就绪(等待被执行),topology生成器将会建立topology并准备执行。下面就来看一下执行步骤。

在本地集群上运行和测试topology

  • 通过TopologyBuilder建立topology。
  • 使用Storm Submitter,将topology递交给集群。以topology的名字、配置和topology的对象作为参数。
  • 提交topology。

Listing Eight:建立和执行topology。

   1.public class StormMain  
   2.{  
   3.     public static void main(String[] args) throws AlreadyAliveException,   
   4.                                                   InvalidTopologyException,   
   5.                                                   InterruptedException   
   6.     {  
   7.          ParallelFileSpout parallelFileSpout = new ParallelFileSpout();  
   8.          ThresholdBolt thresholdBolt = new ThresholdBolt();  
   9.          DBWriterBolt dbWriterBolt = new DBWriterBolt();  
  10.          TopologyBuilder builder = new TopologyBuilder();  
  11.          builder.setSpout("spout", parallelFileSpout, 1);  
  12.          builder.setBolt("thresholdBolt", thresholdBolt,1).shuffleGrouping("spout");  
  13.          builder.setBolt("dbWriterBolt",dbWriterBolt,1).shuffleGrouping("thresholdBolt");  
  14.          if(this.argsMain!=null && this.argsMain.length > 0)   
  15.          {  
  16.              conf.setNumWorkers(1);  
  17.              StormSubmitter.submitTopology(   
  18.                   this.argsMain[0], conf, builder.createTopology());  
  19.          }  
  20.          else 
  21.          {      
  22.              Config conf = new Config();  
  23.              conf.setDebug(true);  
  24.              conf.setMaxTaskParallelism(3);  
  25.              LocalCluster cluster = new LocalCluster();  
  26.              cluster.submitTopology(  
  27.              "Threshold_Test", conf, builder.createTopology());  
  28.          }  
  29.     }  
  30.} 

topology被建立后将被提交到本地集群。一旦topology被提交,除非被取缔或者集群关闭,它将一直保持运行不需要做任何的修改。这也是Storm的另一大特色之一。

这个简单的例子体现了当你掌握了topology、spout和bolt的概念,将可以轻松的使用Storm进行实时处理。如果你既想处理大数据又不想遍历Hadoop的话,不难发现使用Storm将是个很好的选择。

相关文章 相关文档 相关视频



我们该如何设计数据库
数据库设计经验谈
数据库设计过程
数据库编程总结
数据库性能调优技巧
数据库性能调整
数据库性能优化讲座
数据库系统性能调优系列
高性能数据库设计与优化
高级数据库架构师
数据仓库和数据挖掘技术
Hadoop原理、部署与性能调优
 
分享到
 
 


MySQL索引背后的数据结构
MySQL性能调优与架构设计
SQL Server数据库备份与恢复
让数据库飞起来 10大DB2优化
oracle的临时表空间写满磁盘
数据库的跨平台设计
更多...   


并发、大容量、高性能数据库
高级数据库架构设计师
Hadoop原理与实践
Oracle 数据仓库
数据仓库和数据挖掘
Oracle数据库开发与管理


GE 区块链技术与实现培训
航天科工某子公司 Nodejs高级应用开发
中盛益华 卓越管理者必须具备的五项能力
某信息技术公司 Python培训
某博彩IT系统厂商 易用性测试与评估
中国邮储银行 测试成熟度模型集成(TMMI)
中物院 产品经理与产品管理
更多...