1
流数据监控设计概述
1.1 概述前言
1.之前跟大家说要给大家写一些storm实时处理的代码,本来打算周末写的,但周末去爬香山了,所以…迟了几天(这些算是废话)。
2.网上有人贴出的关于GPS实时处理的代码,个人感觉其实时处理只是在速度这一属性上进行上限处理有些简单了,所以想自己设计个项目,所以了“流数据监控”这个模拟项目。
3.目前这个模拟项目比较简单(高手眼中),但总体框架有了,我会一步一步完善,慢慢会补充完整。
1.2 设计大体概述
1.2.1 数据流产生:Spout
数据流的产生目前使用的比较多的是:log文件读取、从mysql(或者是相关db)中获取、从消息中间件(如metaq)中获取及使用socket从网络中获取。
补充:
在该项目中,由于我的metaq还没搭好,所以就直接采用读取log的方式作为源数据,往后会给出metaq作为数据源的接口及mysql作为数据源的接口等。
1.2.2 处理数据:HandleBolt
这里的HandleBolt是宽泛的概念,指对数据进行处理的相关Bolt,目前比较常见的处理方式是数据过滤、数据添加、部分数据统计、数据监控等等。这些都是比较常见的数据实时处理方式。
补充:
该项目中数据处理部分使用数据监控处理,及对数据流进行条件过滤,将部分符合条件的数据筛选出来做进一步处理,达到条件数据监控的目的。目前该部分支持多种条件判断方式组合,多个字段组合判断及多种逻辑判断方式。往后会进一步晚上。
1.2.3 数据持久化:LastingBolt
LastingBolt泛指数据在处理之后进行持久化操作的接口,常见的持久化操作接口是:直接打印(这个貌似不算)、写入file中、写入mysql(及其他db)中、写入消息中间件(metaq)供其他业务调用、使用Socket写入网络端口中等等。
补充:
该项目中暂时设计两个数据持久化Bolt,一个是直接打印出来(比较直观),二是存入mysql中。其他方式接口会慢慢的给出。
2 数据监控设计框架
2.1 数据监控设计拓扑
图2.1 数据监控设计拓扑
数据监控设计拓扑说明:
ReadLogSpout:
该部分spout从domain.log(稍后给出log说明)读取数据,每次读取一行记录,该数据为域名出售log,读取数据后交给MonitorBolt进行处理。
MonitorBolt:
该部分Bolt对数据进行解析,读取配置文件MonitorBolt.xml中的逻辑判断及相关过滤规则等,进行数据过滤,将符合条件的数据发射到下一级(稍后有Bolt分析)。
MysqlBolt:
在MonitorBolt处理完数据之后将数据存入mysql的库表中,数据库相关配置从配置文件MysqlBolt.xml中读取。
PrintBolt:
将结果直接打印出来。
2.2 数据流监控环境拓扑
图2.2 网络环境拓扑
拓扑说明:
该项目只搭建三个storm节点一个为Nimbus节点,两个supervisor节点,其中在Nimbus节点中又安装有mysql。
3 数据监控详细设计
3.1 源数据说明
目前源数据从domain.log中读取,该log为人工构造,模拟域名拍卖的log,其中有五个字段,分别为domain(域名)、value(售价)、time(申请年份)、validity(有效期)、seller(卖家)。详细如下:
图3.1 源数据说明
一行数据为一条记录,每条记录有5个属性。
3.2 数据监控设计
从Monitor的配置文件中说明数据监控的设计:
图3.2 MonitorBolt.xml截图
参数说明:
MatchLogic:条件间的逻辑关系,用于如下几个条件间的逻辑关系指明,其有“AND”及“OR”两种逻辑关系设置。
MatchType:判断类型列表,该列表指明了某个字段用何种匹配算法进行判断,regular为正则匹配、range为范围匹配、routine0为常规模糊匹配、routine1为常规完全匹配。
MatchField:匹配字段列表,指明对哪几个字段进行判断。
FieldValue:对应的字段值。
如上配置说明:对字段1/2/5分别进行正则、范围及常规模糊匹配,字段1满足正则匹配.*google.*,字段2满足从200到2001,字段5满足模糊匹配ina,只有三个条件同时满足“AND”,该数据才会发射到下一级。
3.3 数据Mysql处理
从MysqlBolt.xml中进行说明:
图3.3 MyslqBolt.xml截图
Myslq数据存储处理指明myslq的host,指明database及from,使用username及password将数据存储已经创建好的mysql表中。
3.4 源码简介
图3.4 源码树
源码简单说明:Storm包中为总体运行的Topology,Storm.base目前只有myslq预处理的一个类,storm.bolt为bolt处理类,包括monitorbolt及printbolt,storm.spout包中为spout源数据接口,storm.source为构造源数据的一个类(这个可以忽略),storm.xml为配置文件读取类,domain.log为源数据,MonitorBolt.xml及MyslqBolt.xml分别为配置文件。
Storm项目:流数据监控 <2>流数据监控代码详解
1 项目概述
1.1 数据流向
流数据监控为storm模拟项目,模拟数据源从log文件中读取数据,并逐条发射到监控Bolt中,MonitorsBolt读取配置文件MonitorBolt.xml中的匹配规则,包括正则匹配、范围匹配、常规模糊匹配及常规完全匹配,多个条件可以组合多种匹配方式,多个条件字段可以有两种不同的逻辑关系。MonitorBolt在处理数据之后(过滤出符合匹配规则的数据),发射到数据持久化Bolt中,MysqlBolt读取配置文件MysqlBolt.xml中mysql相关信息,包括mysql的host及端口,username及password,database及from,最后将数据插入mysql中。
1.2 代码树
图1.2 代码树
源码简单说明:
Storm包中为总体运行的Topology,Storm.base目前只有myslq预处理的一个类,storm.bolt为bolt处理类,包括monitorbolt及printbolt,storm.spout包中为spout源数据接口,storm.source为构造源数据的一个类(这个可以忽略),storm.xml为配置文件读取类,domain.log为源数据,MonitorBolt.xml及MyslqBolt.xml分别为配置文件。
2 代码详解
2.1 Package storm
* @author blogchong * @Blog www.blogchong.com * @email blogchong@gmail.com * @QQ_G 191321336 * @version 2014年11月9日 上午11:26:29 */ // 设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。 builder.setSpout("readlog", new ReadLogSpout(), 1); //创建monitor监控过滤节点,指定该节点接收喷发节点的策略为随机方式。 builder.setBolt("monitor", new MonitorBolt("MonitorBolt.xml"), 3) .shuffleGrouping("readlog"); //创建mysql数据存储节点,并传入配置文件 builder.setBolt("mysql", new MysqlBolt("MysqlBolt.xml"), 3) .shuffleGrouping("monitor"); |
注:该部分代码显示了整个topology的结构,每个节点与节点之间的关系(发布与订阅),并且指明了每个节点的喷发方式。
2.2 Package storm.xml
MonitorXml.java: import java.io.File; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.DocumentBuilder; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.NodeList; File file = new File(fd); //创建xml文件模板 DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); DocumentBuilder db = dbf.newDocumentBuilder(); Document doc = db.parse(file); //将Parameter里的项存入list中 NodeList nl = doc.getElementsByTagName("Parameter"); //从list的item中获取参数值 Element e = (Element) nl.item(0); MatchLogic = e.getElementsByTagName("MatchLogic").item(0) .getFirstChild().getNodeValue(); MatchType = e.getElementsByTagName("MatchType").item(0) .getFirstChild().getNodeValue(); MatchField = e.getElementsByTagName("MatchField").item(0) .getFirstChild().getNodeValue(); FieldValue = e.getElementsByTagName("FieldValue").item(0) .getFirstChild().getNodeValue(); |
注:MyslqXml.java与MonitorXml.java核心代码相似,主要是调用java中解析xml的类,主要类见如上import。
2.3 Package storm.spout
ReadLogSpout.java: public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) { this.collector = collector; try { this.fis = new FileInputStream("domain.log"); this.isr = new InputStreamReader(fis, "UTF-8"); this.br = new BufferedReader(isr); } catch (Exception e) { e.printStackTrace(); } } public void nextTuple() { String str = ""; try { //逐行读取发射,直到末尾 while ((str = this.br.readLine()) != null) { this.collector.emit(new Values(str)); Thread.sleep(100); } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } |
注:该类为产生源数据的类,该类逐行读取log文件中的数据,发射到下一级处理Bolt中,读取文件时注意编码转换。
2.4 Package storm.base
MysqlOpt.java import java.io.Serializable; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; public class MysqlOpt implements Serializable { public Connection conn = null; PreparedStatement statement = null; // 连接数据库 public boolean connSQL(String host_p, String database, String username, String password) { String url = "jdbc:mysql://" + host_p + "/" + database + "?characterEncoding=UTF-8"; try { //使用jdbc驱动 Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection(url, username, password); return true; } catch (ClassNotFoundException cnfex) { System.out .println("MysqlBolt-- Error: Loading JDBC/ODBC dirver failed!"); cnfex.printStackTrace(); } catch (SQLException sqlex) { System.out.println("MysqlBolt-- Error: Connect database failed!"); sqlex.printStackTrace(); } return false; } // 插入数据 public boolean insertSQL(String sql) { try { statement = conn.prepareStatement(sql); statement.executeUpdate(); return true; } catch (SQLException e) { System.out.println("MysqlBolt-- Error: Insert database failed!"); e.printStackTrace(); } catch (Exception e) { System.out.println("MysqlBolt-- Error: Insert failed!"); e.printStackTrace(); } return false; } // 关闭连接 public void deconnSQL() { try { if (conn != null) conn.close(); } catch (Exception e) { System.out.println("MysqlBolt-- Error: Deconnect database failed!"); e.printStackTrace(); } } } |
注:该类是mysql的操作类,包括mysql的链接、数据插入及数据库关闭等操作,供Mysqlbolt调用。
2.5 Package storm.bolt
Monitorbolt.java: public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { System.out.println("MonitorBolt -- Start!"); this.collector = collector; // 从conf中获取参数 new MonitorXml(this.monitorXml).read(); this.MatchLogic = MonitorXml.MatchLogic; this.MatchType = MonitorXml.MatchType; this.MatchField = MonitorXml.MatchField; this.FieldValue = MonitorXml.FieldValue; } public void execute(Tuple input) { //订阅str String str = input.getString(0); if (this.flag_par == false) { System.out .println("MonitorBolt-- Erre: can't get the path of Monitor.xml!"); } else { //调用Monitor进行条件判断,除了str,其他参数为配置文件中读取的列表 boolean moni = Monitor(str, this.MatchLogic, this.MatchType, this.MatchField, this.FieldValue); if (moni == true) { // System.out.println("Monitor!!!"); this.collector.emit(new Values(str)); } } } private boolean Monitor(String str, String logic, String type, String field, String value) { //将列表拆分 String[] types = type.split("::"); String[] fields = field.split("::"); String[] values = value.split("::"); int flag_init = types.length; int flag = 0;//判断标志 if (logic.equals("AND")) {//逻辑AND for (int i = 0; i < flag_init; i++) { if (types[i].equals("regular")) { //调用正则匹配方法regular boolean regu = regular(str, fields[i], values[i]); if (regu == true) { flag++; } } else if (types[i].equals("range")) { //调用范围匹配方法range boolean ran = range(str, fields[i], values[i]); if (ran == true) { flag++; } } else if (types[i].equals("routine0")) { //调用常规模糊匹配方法routine0 boolean rou0 = routine0(str, fields[i], values[i]); if (rou0 == true) { flag++; } } else if (types[i].equals("routine1")) { //调用常规完全匹配方法routine1 boolean rou1 = routine1(str, fields[i], values[i]); if (rou1 == true) { flag++; } } } if (flag == flag_init) { //所有条件都满足时 return true; } else { return false; } } else if (logic.equals("OR")) {//逻辑OR for (int i = 0; i < flag_init; i++) { if (types[i].equals("regular")) { boolean regu = regular(str, fields[i], values[i]); if (regu == true) { flag++; } } else if (types[i].equals("range")) { boolean ran = range(str, fields[i], values[i]); if (ran == true) { flag++; } } else if (types[i].equals("routine0")) { boolean rou0 = routine0(str, fields[i], values[i]); if (rou0 == true) { flag++; } } else if (types[i].equals("routine1")) { boolean rou1 = routine1(str, fields[i], values[i]); if (rou1 == true) { flag++; } } } if (flag != 0) { return true; } else { return false; } } return false; } // 正则匹配判断 private boolean regular(String str, String field, String value) { String[] strs = str.split("\t"); Pattern p = Pattern.compile(value); Matcher m = p.matcher(strs[Integer.parseInt(field) - 1]); boolean result = m.matches(); if (result == true) { return true; } else { return false; } } // 范围匹配 private boolean range(String str, String field, String value) { String[] strs = str.split("\t"); String[] values = value.split(","); int strss = Integer.parseInt(strs[Integer.parseInt(field) - 1]); if (values.length == 1) { if (strss > Integer.parseInt(values[0])) { return true; } else { return false; } } else if (values.length == 2 && values[0].length() == 0) { if (strss < Integer.parseInt(values[1])) { return true; } else { return false; } } else if (values.length == 2 && values[0].length() != 0) { if (strss > Integer.parseInt(values[0]) && strss < Integer.parseInt(values[1])) { return true; } else { return false; } } else { return false; } } // 常规模糊匹配 private boolean routine0(String str, String field, String value) { String[] strs = str.split("\t"); String strss = strs[Integer.parseInt(field) - 1]; if (strss.contains(value) && !strss.equals(value)) { return true; } else { return false; } } // 常规完全匹配 private boolean routine1(String str, String field, String value) { String[] strs = str.split("\t"); String strss = strs[Integer.parseInt(field) - 1]; if (strss.equals(value)) { return true; } else { return false; } } |
注1:该类主要设计了匹配规则,支持多种匹配方式,包括正则、范围、常规模糊及完全匹配,且支持两种逻辑判断关系。
MyslqBolt.java: public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { System.out.println("MysqlBolt -- Start!"); this.collector = collector; // 初始化mysql Loading(); } // 参数初始化 public void Loading() { new MysqlXml(this.mysqlXml).read(); String host_port = MysqlXml.Host_port; // mysql地址及端口 String database = MysqlXml.Database; // 数据库名 String username = MysqlXml.Username; // 用户名 String password = MysqlXml.Password; // 密码 this.from = MysqlXml.From; // 表名 if (this.mysql.connSQL(host_port, database, username, password) == false) { System.out .println("MysqlBolt--Config errer, Please check Mysql-conf: " + this.mysqlXml); flag_xml = false; } else { System.out.println("MysqlBolt-- Conf Loaded: " + this.mysqlXml); } } public void execute(Tuple input) { String str = input.getString(0); if (this.flag_par == false) { System.out .println("MysqlBolt-- Erre: can't get the path of Mysql.xml!"); } else { if (this.flag_xml == true) { String insert = send_str(str); if (this.mysql.insertSQL(insert) == false) { System.out .println("MysqlBolt-- Erre: can't insert tuple into database!"); System.out.println("MysqlBolt-- Error Tuple: " + str); } } } } //插入mysql语句构造方法 public String send_str(String str) { String send_tmp = null; String field[] = str.split("\t"); for (int i = 0; i < field.length; i++) { if (i == 0) { send_tmp = "'" + field[0] + "', '"; } else if (i == (field.length - 1)) { send_tmp = send_tmp + field[i] + "'"; } else { send_tmp = send_tmp + field[i] + "', '"; } } String send = "insert into " + this.from + " values (" + send_tmp + ");"; return send; } |
注2:该类主要用于数据存储,调用了base包中的mysqlOpt类中的多个方法,对mysql进行连接,数据插入及数据库关闭等等。
Storm项目:流数据监控 <3>流数据监控示例运行
1 文档说明
该文档为storm模拟项目第三份文档,第一份文档简单的介绍了模拟项目的设计,第二份文档为关键代码详解,这份文档则是示例运行的文档,从源代码打包到配置文件配置,lib文件导入,及任务提交,最后到处理数据输出到mysql中,这一整个流程。
关于该部分代码可以到博客中留言获取,或者是加入191321336扣扣群获取。
2 示例说明
2.1 数据源
//这个数据源构造的缘由是,当年想过要建站的,关注过,所以有此。
如今网站终于建起来了,热泪盈盈啊~~~
图2.1 源数据
数据源形如以上截图,共有五个字段,字段解释则不一一说明,详见文档一。以上形式数据作为数据源。
2.2 过滤规则
图2.2 monitorBolt.xml过滤配置
设置如上过滤规则,及过滤符合以下规则的数据流:第一个字段正则匹配.*google.*,第二字段在200至2001的范围内,第五个字段符合常规模糊匹配,即字段值中包含ina几个字符。且以上三个字段逻辑关系为“AND”,即同时满足以上三个条件,数据才会被过滤出来。
2.3 数据存储
图2.3 mysqlBolt.xml存储配置
以上为mysql存储配置,mysql地址为192.168.2.240(在nimbus节点上安装有mysql),提前创建好storm数据库,在该库中创建表monitor。
3 详细步骤
3.1 配置文件
图3.1-1 配置文件路径
配置文件路径我写的是/root/hcy/jar/,将这两个xml文件放在supervisor节点的该目录下,程序运行时会去该路径查找。并且将数据源domian.log也放到该目录下。
具体配置配置方式,参考第二章中的图2.2及2.3
3.2 代码打包
图3.2 代码打包
使用eclipse中的file选项下的export选项,导出类型为Jar file。将生成的jar包放入nimbus节点下。
3.3 数据库准备
在mysql中创建出storm数据库,在该库中创建表monitor,如下:
图3.3-1 创建表monitor
图3.3-2 表描述
Ps:为省事我全部创建了char类型的哦。
注意:
必须将mysql配置成能够远程访问的。
如下:
图3.3-3 配置mysql
3.4 环境准备
3.4.1 配置
关于storm.yaml配置如下:
图3.4.1 storm.yaml配置
3.4.2 启动集群
在三个节点上启动ZK,在nimbus上启动用命令storm nimbus&启动nimbus及用storm
ui&命令启动UI监控页面,在supervisor上启动storm supervisor&。
在nimbus所在机器上启动mysql。
3.5 提交任务
图3.5-1 提交任务命令
我把jar包放在nimbus下/root/hcy/jar目录下。执行以上命令,使用监控页面监控任务状态。
图3.5-2 UI监控页面
3.6 输出查询
远程登录myslq:
图3.6-1 进入storm数据库
进行过滤结果查询:
图3.6-2 10条过滤数据查询结果
4 项目扩展
下一步计划改善spout接口,计划源数据从metaq(消息队列中读取)获取,写一个metaq与storm的接口。
有兴趣的朋友请继续关注博客虫。
我会将这个模拟项目一步一步的完善,初步计划如下:
1、布置metaq集群,写metaq与storm的接口;
2、实现线上更新(例如动态更改过滤规则);
3、部署hadoop集群,写hdfs与storm接口;
4、支持类Top N统计处理。 |