1
文档说明
该文档为storm模拟项目系列文档之一,是MetaQ与storm接口的说明文档,主要介绍了如何集成MetaQ到项目代码中。
软件(阿里),其对应的许多技术文档还是比较容易看的,并且Github提供了许多的应用实例,所以使用MetaQ作为Storm的消息源之一是必须掌握的。
该模拟项目中,使用MetaQ作为storm的消息源,MetaqSpout从指定的zkconnect及topic中读取数据,发布到节点中。此外,还写了MetaQ与storm的生产者接口,即MetaqBolt指定Topic将数据写入Metaq中供其他业务系统继续使用。
好了文档说明就这些了,代码马上会随文档更新(通常情况下,代码调试成功了我才会写文档的)。
2 MetaQ与Storm接口
2.1 MetaqSpout
2.1.1 接口说明
该接口参考自Github,作了部分修改。项目设计中,使用storm.xml.MetaqSpoutXml读取MetaqSpout对应的配置文件MetaqSpout.xml
配置文件中,指明zkconnect的地址及端口号、metaq的root目录、对应的消费topic及其消费组(这个很重要)。
读取配置之后,将配置传递到spout的open部分进行初始化工作,主要是进行消费者参数设定(包括zkconnect、root目录、Topic及Group设置)等。
在nextTuple方法中,进行消息(message)拉取(poll),每次拉取一条记录,发布到下一个拓扑节点中。
2.1.2 上代码
贴部分主要代码(详细参考代码包):
//构造函数,传递xml地址 public MetaqSpout(String MetaqSpoutXml) { super(); this.metaqspoutxml = MetaqSpoutXml; } //实例化参数配置类 private ZKConfig zkConfig = new ZKConfig(); private MetaClientConfig metaClientConfig = new MetaClientConfig(); private final Scheme scheme = new StringScheme(); //初始化调用 public void open(final Map conf, final TopologyContext context,
final SpoutOutputCollector collector) { //从xml中获取参数 new MetaqSpoutXml(this.metaqspoutxml).read(); this.zkConfig.zkConnect = MetaqSpoutXml.zkConnect;//"192.168.2.240:2181"; this.zkConfig.zkRoot = MetaqSpoutXml.zkRoot;//"/meta"; String topic = MetaqSpoutXml.topic; String group = MetaqSpoutXml.group; this.metaClientConfig.setZkConfig(this.zkConfig); this.consumerConfig = new ConsumerConfig(group); //final String topic = (String) conf.get(TOPIC); if (topic == null) { throw new IllegalArgumentException(TOPIC + " is null"); } Integer maxSize = (Integer) conf.get(FETCH_MAX_SIZE); if (maxSize == null) { log.warn("Using default FETCH_MAX_SIZE"); maxSize = DEFAULT_MAX_SIZE; } this.id2wrapperMap = new ConcurrentHashMap<Long, MetaMessageWrapper>(); this.messageQueue = new LinkedTransferQueue<MetaMessageWrapper>(); try { this.collector = collector; this.setUpMeta(topic, maxSize); } catch (final MetaClientException e) { log.error("Setup meta consumer failed", e); } } private void setUpMeta(final String topic, final Integer maxSize) throws MetaClientException { this.sessionFactory = new MetaMessageSessionFactory(this.metaClientConfig); this.messageConsumer = this.sessionFactory.createConsumer(this.consumerConfig); this.messageConsumer.subscribe(topic, maxSize, new MessageListener() { public void recieveMessages(final Message message) { final MetaMessageWrapper wrapper = new MetaMessageWrapper(message); MetaqSpout.this.id2wrapperMap.put(message.getId(), wrapper); MetaqSpout.this.messageQueue.offer(wrapper); try { wrapper.latch.await(); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); } // 消费失败,抛出运行时异常 if (!wrapper.success) { throw new RuntimeException("Consume message failed"); } } public Executor getExecutor() { return null; } }).completeSubscribe(); } //关闭时调用,进行consumer的shutdown操作 public void close() { try { this.messageConsumer.shutdown(); } catch (final MetaClientException e) { log.error("Shutdown consumer failed", e); } try { this.sessionFactory.shutdown(); } catch (final MetaClientException e) { log.error("Shutdown session factory failed", e); } } //消息发布 public void nextTuple() { if (this.messageConsumer != null) { try { //进行消息拉取 final MetaMessageWrapper wrapper = this.messageQueue.poll
(WAIT_FOR_NEXT_MESSAGE, TimeUnit.MILLISECONDS); if (wrapper == null) { return; } final Message message = wrapper.message; this.collector.emit(this.scheme.deserialize(message.getData()), message.getId()); } catch (final InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } //消息操作成功确认机制 public void ack(final Object msgId) { if (msgId instanceof Long) { final long id = (Long) msgId; final MetaMessageWrapper wrapper = this.id2wrapperMap.remove(id); if (wrapper == null) { log.warn(String.format("don't know how to ack(%s: %s)", msgId.getClass().getName(), msgId)); return; } wrapper.success = true; wrapper.latch.countDown(); } else { log.warn(String.format("don't know how to ack(%s: %s)", msgId.getClass().getName(), msgId)); } } //消费失败时返回 public void fail(final Object msgId) { if (msgId instanceof Long) { final long id = (Long) msgId; final MetaMessageWrapper wrapper = this.id2wrapperMap.remove(id); if (wrapper == null) { log.warn(String.format("don't know how to reject(%s: %s)", msgId.getClass().getName(), msgId)); return; } wrapper.success = false; wrapper.latch.countDown(); } else { log.warn(String.format("don't know how to reject(%s: %s)", msgId.getClass().getName(), msgId)); } } |
2.2 MetaqBolt
2.2.1 接口说明
该部分代码修改自Github上的Metaq异步生产者实例。设计这个Bolt的原因是,部分业务有这种需求,当经过storm实时处理后,数据发送到下一个业务系统,当下一个业务系统也是从metaq拉取数据时,就需要我们把处理过的数据写入到metaq中去,所以有了这个接口。
其读取配置文件的过程与MetaqSpout相似,但是没有组(Group)的概念,只需指定地址、目录及Topic(前提是Metaq上有该Topic),则可以把数据写入metaq中。
2.2.1 上代码
该部分代码较简单,可以参考AsyncConsumer代码。
//构造,传递配置路径 public MetaqBolt(String MetaqSpoutXml) { super(); this.metaqspoutxml = MetaqSpoutXml; } //初始化操作 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { System.out.println("MetaqBolt -- Start!"); this.collector = collector; // 初始化metaq的一些设置,包括zk链接地址,根目录等 this.zkConfig.zkConnect = MetaqSpoutXml.zkConnect;// "192.168.2.240:2181"; this.zkConfig.zkRoot = MetaqSpoutXml.zkRoot;// "/meta"; this.topic = MetaqSpoutXml.topic; this.metaClientConfig.setZkConfig(this.zkConfig); try { this.sessionFactory = new MetaMessageSessionFactory( this.metaClientConfig); } catch (MetaClientException e) { e.printStackTrace(); } this.producer = this.sessionFactory.createProducer(); this.producer.publish(this.topic);// 发布topic } public void execute(Tuple input) { String str = input.getString(0); try { this.sendResult = producer.sendMessage(new Message(this.topic, str .getBytes())); } catch (MetaClientException | InterruptedException e) { e.printStackTrace(); } //当生产失败时打印失败数据 if (!this.sendResult.isSuccess()) { System.err.println("Send message failed,error message:" + sendResult.getErrorMessage()); } } |
3 代码改动说明
关于此次代码变动较大,加了一个spout源的接口,一个bolt的数据落地接口,对topology进行了优化。
具体如下:
(1) 增加了MetaqSpout接口,实现从MetaQ中读取数据(重点)
(2) 增加了MetaqBolt接口,实现新的数据落地接口,将数据写入MetaQ中
(3) 修改了Topology主类,实现了节点可配置,通过配置文件列表,即不同类型的spout及bolt可动态搭配,想要实现不同拓扑功能,不用修改代码,而只需修改配置即可(重点)
4 关于Metaq的报错
之前调试代码时遇到一个错误,纠结了很久,后面还是群里的一个朋友指点,才知道了错在哪里,所以把这个错误记载下来。
4.1 报错
PS:根据错误提示,总是没找到其原因。
4.2 解决
在群里朋友的指点下,查看了metaq的启动日志。
才发现metaq往zk注册的服务器ip是192.168.122.1不是我本机的ip,之前对metaq进行配置的时候,并没有进行hostName配置,因为metaq据说默认的注册ip是localhost所以就没有注意了,但是好像这种情况来看,他进行zk注册的时候使用的是其代码内部的预留ip进行注册。
我在metaq的server.ini中进行了hostName配置,这个问题就解决了。
Storm项目:流数据监控 <5>Zookeeper统一配置
1 文档说明
好久没更新storm相关的文章了(快一个月了),很早以前就有朋友提出过storm的拓扑任务会出现supervisor各节点配置麻烦的问题,基于此,我很早就考虑如何解决这个问题,直到现在才把这个问题解决(工作小忙)。
使用zookeeper进行设计storm拓扑的配置系统,设计思路大家可以参考参考,代码部分就有点次了(时间太匆忙了,白天上班,大半夜写代码,写文章不容易啊),大家能看得下去就看吧,不过优化优化改改还是能用的。
好了,不多说了,喜欢的朋友,关注博客虫,会时常更新点东西,有兴趣一起交流的朋友,加入到191321336的技术群中,一起讨论交流~~~
2 Zookeeper统一配置
2.1 设计统一配置因素
在storm的拓扑任务中,采用常规的节点配置从配置文件(conf)中直接读取,会出现以下几个问题:
(1) 节点太多,配置文件需要配置的节点太多,太繁琐;
(2) 配置需要修改时,很难保证各个节点的配置文件统一;
(3) 无法定位各个功能节点位置,每个节点需要所有配置文件,导致每个节点配置臃肿;
(4) 配置文件太分散,动态更新不现实。
以上几个主要原因导致了,采用传统的直接读取配置文件获取配置信息的方式不适合类似这种集群应用,必然需要设计一个能够进行统一配置的组件或者是系统。
2.2 配置系统技术选型
想要实现集群配置文件的统一配置,必然不能将配置文件放在各个分散的节点上,考虑的思路是放在一个节点上,然后让该节点读取,其他节点从该节点获取,这样就保证了配置文件统一性(只此一份)。可以考虑以下几种设计思路:
(1) 在storm的拓扑主类(Topology)中使用Config conf
= new Config();进行参数传递,及在主类中读取配置文件中的所有配置,然后通过Config类传递给相应节点。这种方式,笔者曾经尝试过,确实能解决配置文件统一问题,只需nimbus节点读取配置即可,但是,这种方式在配置文件很多时,会导致主类Topology相当臃肿,相当麻烦,并且相应节点获取参数时也相当麻烦。
(2) 考虑使用mysql。在nimbus中,读取配置文件,然后将配置信息按一定规则存放到一个所有节点都能访问的mysql服务器中。该mysql服务器专门用来保存集群的配置文件,所有节点都上该mysql服务器中读取配置信息,这种方式能够解决配置统一的问题,并且也不少集群应用是这么干的。但这种方式有不少的缺点,首先是需要部署一个专门的mysql服务器,这是额外的代价,如果部署到其中一个节点中倒是可以,但也会影响性能,并且mysql并没有对配置信息变更有效的监控措施,并且存在单点问题,安全性,实用性都不太高。
(3) 还有就是使用zookeeper来实现了。Zookeeper的其中有个功能有点类似向zookeeper集群中提供公共文件夹(官方语言称其为“节点”)。能够访问zk服务器的服务器都能够对其公共节点进行访问。利用该机制,我们就可以使用zk设计统一配置系统了。Nimbus以客户端的身份向zk服务器申请公共节点,并在公共节点上存储配置信息,其他节点需要获取配置文件时,以客户端的身份从zk服务器中获取公共节点的相关值,这样就实现了统一配置。实用zk进行统一配置有如下好处:storm集群中已经有zk,不需要额外部署;zk无单点故障问题,配置信息安全性高;zk对节点具有watch功能,节点变更能够进行提醒并可以自定义相关操作。所以就目前来说,使用zk进行系统统一配置是比较合理的。
2.3 统一配置设计方案
图2.3 Zookeeper统一配置架构图
(1) 如图2.3所示,nimbus读取Topology.conf文件,将除nimbus节点配置参数以外的配置信息放入zookeeper服务器(zk集群)的最底层公共节点上(创建一定规则的节点树,放在最底层的子节点上);
(2) zookeeper集群维护这些配置节点,进行永久保持(除非主动删除),并执行watcher,监视配置节点的变更,随时通知supervisor配置变化;
(3) supervisor第一次从zk集群中读取配置信息,然后开始检测zk的watcher信号,当检测到wather信号(配置变更),重新获取配置信息,从而不但实现了统一配置,还能实现配置的在线动态更新(这点非常重要);
(4) 整个zk中配置节点目录层次为第一层是总配置根节点/TopologyConf;第二层则是根据拓扑提交的任务名进行创建,使用任务民进行配置节点创建不会导致有重复配置出现,因为一个storm集群中,拓扑的任务名是唯一的;底层节点为各个存储配置的名称,值为配置参数。
整个架构围绕zookeeper而存在,而zookeeper也是storm集群的依赖集群,需要额外配置,只需跟storm共享zookeeper集群即可,并且不需要担心配置系统会出现故障,zookeeper集群不存在系统故障。
这个架构不单纯可以在storm的应用集群中使用,其他类似的集群应用配置都可以进行类似设计,能够进行统一配置,无单点故障,实现动态更新。
2.4 代码参考
PS:这部分代码由于时间关系,写的比较糙,只是实现了配置信息的存取,没有实现wather功能,并且相对于之前版本的工程代码,变化较大,舍弃了所有可观性不强的xml方式配置,使用标准方式配置格式,并把所有配置相关的类整合在一个包里(storm.conf),供主类及其他bolt调用。
2.4.1 storm.conf.OptZookeeper.java
该类主要定义了一些zookeeper的基本操作,作为一个基本类供其他conf类调用,其中主要的一些方法定义如下:
private static final int SESSION_TIMEOUT = 3000; private ZooKeeper zookeeper0; // 以下为各种zk操作方法 private Watcher watcher = new Watcher() { public void process(WatchedEvent event) { System.out.println("process: " + event.getType()); } }; // 链接zk,参数为连接zookeeper的server地址及端口,可设置多个以“,”隔开 // 如zkServer=192.168.2.240:2181,192.168.2.241:2181 public ZooKeeper connect(String zkServer) throws IOException { zookeeper0 = new ZooKeeper(zkServer, SESSION_TIMEOUT, watcher); return zookeeper0; } // 关闭zk public void close(ZooKeeper zookeeper) throws InterruptedException { zookeeper.close(); } // 创建节点 /** * 创建一个znode 1.CreateMode 取值 PERSISTENT:持久化,这个目录节点存储的数据不会丢失 * PERSISTENT_SEQUENTIAL * :顺序自动编号的目录节点,
这种目录节点会根据当前已近存在的节点数自动加1,
然后返回给客户端已经成功创建的目录节点名; * EPHEMERAL:临时目录节点,
一旦创建这个节点的客户端与服务器端口也就是session过期超时,这种节点会被自动删除 * EPHEMERAL_SEQUENTIAL:临时自动编号节点 * @throws InterruptedException * @throws KeeperException*/ // 若节点存在则只设置值,若不存在则创建,若出错返回false public boolean createNode(ZooKeeper zookeeper, String node, String value) { try { if (zookeeper.exists(node, null) != null) { zookeeper.setData(node, value.getBytes(), -1); } else { zookeeper.create(node, value.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (Exception e) { System.out.println(e.getMessage()); Assert.fail(); return false; } return true; } // 删除节点,忽略版本,若不存在则抛错 public boolean deleteNode(ZooKeeper zookeeper, String node) { try { if (zookeeper.exists(node, null) != null) { zookeeper.delete(node, -1); return true; } else { System.out.println(node + " exists now!"); return false; } } catch (Exception e) { System.out.println(e.getMessage()); Assert.fail(); return false; } } // 获取数据,若节点不存在抛错 public String getData(ZooKeeper zookeeper, String node) { String result = null; try { if (zookeeper.exists(node, null) != null) { byte[] bytes = zookeeper.getData(node, null, null); result = new String(bytes); } else { System.out.println(node + " exists now!"); } // 类型转换 } catch (Exception e) { // TODO Auto-generated catch block System.out.println(e.getMessage()); Assert.fail(); } return result; } // 获取数据,设置watch,只要节点改变(节点的值改变)则触发watcher,执行watcher中函数 public String getDataWatch(ZooKeeper zookeeper, String node) { String result = null; try { if (zookeeper.exists(node, null) != null) { byte[] bytes = zookeeper.getData(node, new Watcher() { public void process(WatchedEvent event) { System.out.println("testExistWatch2 Watch : {}" + event.getType()); } }, null); result = new String(bytes); } else { result = null; } } catch (Exception e) { // TODO Auto-generated catch block System.out.println(e.getMessage()); Assert.fail(); } return result; } // 设置对应目录下的数据,-1表示忽略版本 // 该设置能改变原始值 public boolean setData(ZooKeeper zookeeper, String node, String value) { try { if (zookeeper.exists(node, null) != null) { zookeeper.setData(node, value.getBytes(), -1); return true; } else { return false; } } catch (Exception e) { System.out.println(e.getMessage()); Assert.fail(); return false; } } // 获取子节点,返回一个list public List<String> getNodeChild(ZooKeeper zookeeper, String node) { try { if (zookeeper.exists(node, null) != null) { List<String> list = zookeeper.getChildren(node, true); return list; } else { return null; } } catch (Exception e) { System.out.println(e.getMessage()); Assert.fail(); return null; } } |
2.4.2 storm.conf.GetTopConf.java
该类主要是提供给主类相关的配置信息,由于主类是在nimbus上执行,所以其所需配置文件则直接获取,不经过zookeeper配置系统。
public void getTopConf(String configFile) throws Exception { Properties pros = new Properties(); File config = new File(configFile); pros.load(new FileInputStream(config)); // 获取规则列表路径 this.TopologyList = pros.getProperty("topologyList.path", "TopologyList.rule"); // 获取worker数值 this.WorkerNum = pros.getProperty("worker.num", "10"); if (isNum.isNumeric(WorkerNum)) { this.MaxWorker = Integer.parseInt(WorkerNum); } if (TopologyList != null) { FileInputStream fis = new FileInputStream(TopologyList); InputStreamReader isr = new InputStreamReader(fis, "UTF-8"); BufferedReader br = new BufferedReader(isr); String line = ""; // 计算规则列表有效节点数 while ((line = br.readLine()) != null) { if (line.length() != 0) { this.Topology[this.Top_num] = line; this.Top_num++; } } } } |
2.4.3 storm.conf.PutConf.java
该类为主要的申请节点,往zookeeper中写入配置信息,该类供主类调用,在nimbus执行时,将配置信息写入zk中。
private static final int SESSION_TIMEOUT = 3000; private ZooKeeper zookeeper; private OptZookeeper optzk = new OptZookeeper(); public void putZkConf(String configFile, String taskName) throws Exception { Properties pros = new Properties(); File config = new File(configFile); pros.load(new FileInputStream(config)); // 从nimbus域中获取配置文件zk服务器地址并连接zk String nimbusZkConect = pros.getProperty("nimbus.zkConnect", "localhost:2181"); zookeeper = optzk.connect(nimbusZkConect); // 创建配置根目录 String ConfRoot = "TopologyConf"; String StrTmp = "/"; String node1 = StrTmp + ConfRoot; String node2 = StrTmp + ConfRoot + StrTmp + taskName; // 创建一级根目录"/TopologyConf" if (zookeeper.exists(node1, null) == null) { optzk.createNode(zookeeper, StrTmp + ConfRoot, "TopologyConf"); } // 创建二级根目录"/TopologyConf/taskName" if (zookeeper.exists(node2, null) != null) { List<String> list = optzk.getNodeChild(zookeeper, node2); for (String chNode : list) { optzk.deleteNode(zookeeper, node2 + StrTmp + chNode); } } else { optzk.createNode(zookeeper, node2, "taskName"); System.out.println("TaskName"); } // 获取MetaSpout并写入zk配置服务器中 String SpoutZkConnect = pros.getProperty("spout.zkConnect", "localhost:2181"); String SpoutZkRoot = pros.getProperty("spout.zkRoot", "/meta"); String SpoutZkTopic = pros.getProperty("spout.zkTopic", "storm_test"); String SpoutZkGroup = pros.getProperty("spout.zkGroup", "storm_test"); optzk.createNode(zookeeper, node2 + StrTmp + "spout.zkConnect", SpoutZkConnect); optzk.createNode(zookeeper, node2 + StrTmp + "spout.zkRoot", SpoutZkRoot); optzk.createNode(zookeeper, node2 + StrTmp + "spout.zkTopic", SpoutZkTopic); optzk.createNode(zookeeper, node2 + StrTmp + "spout.zkGroup", SpoutZkGroup); // 获取MetaBolt并写入zk配置服务器中 String BoltZkConnect = pros.getProperty("bolt.zkConnect", "localhost:2181"); String BoltZkRoot = pros.getProperty("bolt.zkRoot", "/meta"); String BoltZkTopic = pros.getProperty("bolt.zkTopic", "storm_test"); optzk.createNode(zookeeper, node2 + StrTmp + "bolt.zkConnect", BoltZkConnect); optzk.createNode(zookeeper, node2 + StrTmp + "bolt.zkRoot", BoltZkRoot); optzk.createNode(zookeeper, node2 + StrTmp + "bolt.zkTopic", BoltZkTopic); // 获取MonitorBolt参数并写入zk配置服务器中 String MatchLogic = pros.getProperty("match.logic", "AND"); String MatchType = pros.getProperty("match.type", "regular::range::routine0"); String MatchField = pros.getProperty("match.field", "1::2::5"); String MatchFieldValue = pros.getProperty("match.fieldValue", ".*google.*::200,2001::ina"); optzk.createNode(zookeeper, node2 + StrTmp + "match.logic", MatchLogic); optzk.createNode(zookeeper, node2 + StrTmp + "match.type", MatchType); optzk.createNode(zookeeper, node2 + StrTmp + "match.field", MatchField); optzk.createNode(zookeeper, node2 + StrTmp + "match.fieldValue", MatchFieldValue); // 获取MysqlBolt参数并写入zk配置服务器中 String MysqlHost = pros.getProperty("mysql.host_port", "192.168.2.240:3306"); String MysqlDatabase = pros.getProperty("mysql.database", "storm"); String MysqlFrom = pros.getProperty("mysql.from", "monitor"); String MysqlUsername = pros.getProperty("mysql.username", "root"); String MysqlPassword = pros.getProperty("mysql.password", "123456"); optzk.createNode(zookeeper, node2 + StrTmp + "mysql.host_port", MysqlHost); optzk.createNode(zookeeper, node2 + StrTmp + "mysql.database", MysqlDatabase); optzk.createNode(zookeeper, node2 + StrTmp + "mysql.from", MysqlFrom); optzk.createNode(zookeeper, node2 + StrTmp + "mysql.username", MysqlUsername); optzk.createNode(zookeeper, node2 + StrTmp + "mysql.password", MysqlPassword); optzk.close(zookeeper); } |
2.4.4 storm.conf.GetConf.java
该类为从zookeeper中获取配置信息的类,为各个bolt调用,从节点中获取配置信息。
public boolean getZkConf(String configFile, String taskName) throws Exception { Properties pros = new Properties(); File config = new File(configFile); pros.load(new FileInputStream(config)); // 从nimbus域中获取配置文件zk服务器地址并连接zk String nimbusZkConect = pros.getProperty("nimbus.zkConnect", "localhost:2181"); zookeeper = optzk.connect(nimbusZkConect); // 创建配置根目录 String ConfRoot = "TopologyConf"; String StrTmp = "/"; String node2 = StrTmp + ConfRoot + StrTmp + taskName; // 创建二级根目录"/TopologyConf/taskName" if (zookeeper.exists(node2, null) != null) { //// 获取MetaSpout的配置信息 SpoutZkConnect = optzk.getData(zookeeper, node2 + StrTmp + "spout.zkConnect"); SpoutZkRoot = optzk.getData(zookeeper, node2 + StrTmp + "spout.zkRoot"); SpoutZkTopic = optzk.getData(zookeeper, node2 + StrTmp + "spout.zkTopic"); SpoutZkGroup = optzk.getData(zookeeper, node2 + StrTmp + "spout.zkGroup"); // 获取MetaBolt的配置信息 BoltZkConnect = optzk.getData(zookeeper, node2 + StrTmp + "bolt.zkConnect"); BoltZkRoot = optzk.getData(zookeeper, node2 + StrTmp + "bolt.zkRoot"); BoltZkTopic = optzk.getData(zookeeper, node2 + StrTmp + "bolt.zkTopic"); // 获取MonitorBolt的配置信息 MatchLogic = optzk.getData(zookeeper, node2 + StrTmp + "match.logic"); MatchType = optzk.getData(zookeeper, node2 + StrTmp + "match.type"); MatchField = optzk.getData(zookeeper, node2 + StrTmp + "match.field"); MatchFieldValue = optzk.getData(zookeeper, node2 + StrTmp + "match.fieldValue"); // 获取MysqlBolt的配置信息 MysqlHost = optzk.getData(zookeeper, node2 + StrTmp + "mysql.host_port"); MysqlDatabase = optzk.getData(zookeeper, node2 + StrTmp + "mysql.database"); MysqlFrom = optzk.getData(zookeeper, node2 + StrTmp + "mysql.from"); MysqlUsername = optzk.getData(zookeeper, node2 + StrTmp + "mysql.username"); MysqlPassword = optzk.getData(zookeeper, node2 + StrTmp + "mysql.password"); optzk.close(zookeeper); return true; } else { System.out.println("Conf is not exist!"); return false; } } |
3 Zookeeper相关API
4 项目拓展
MonitorTopology目前已经完善了metaq与storm接口,实时过滤处理,数据存储方面实现了metaq重写接口,mysql存储接口,整个架构的统一配置系统。就总体实时处理架构来说,已经越来越完善。
项目扩展的目标:
(1) metaq数据获取,即尝试从现有前端动态日志中(模拟前端业务系统动态产生类似log4j日志)动态获取日志,并写入metaq中作为源数据;
(2) 丰富实时处理部分的bolt,增加其他功能bolt,如求Top N、条件统计、数据拆分等等;
(3) 丰富数据存储部分接口,特别是与hadoop接口;
(4) 使用zookeeper或者是mysql实现架构的元数据部分。
不积跬步无以至千里,我会一步一步把这个storm架构填充丰满。
Storm项目:流数据监控 <6>最新代码结构及详解
1 文档说明
DataOptTopology项目由之前的monitorTopology整理而来,并且在此基础上增加了功能,梳理了代码。
2 代码结构说明
2.1 代码树
2.1 storm
该包下只有一个启动主类,即构造拓扑结构的地方。在这里,你可以用现有的spout以及bolt根据构造top的基本规则,构造出自己想要的数据处理流程top来。
2.2 storm.base
//包含了各种其他类所需要的基本类,公共类。
ConfCheck.java
定时检测配置文件是否发生了更改,是一个线程类,供所有有配置文件的组件调用,主要是一些spout以及bolt组件。
MacroDef.java
静态变量的宏定义类,供所有的类调用。
MetaMessageWrapper.java
MetaSpout的一个辅助类。
MysqlOpt.java
封装了一些mysql的基本操作,比如连接,数据插入,数据库关闭等基本操作,主要是供MysqlBolt调用。
StringScheme.java
MetaSpout输出规范化辅助类,主要是申明接入数据的编码。
2.3 storm.bolt
//包含了各种bolt处理类
MonitorBolt.java
数据过滤bolt,包含了普通匹配、正则匹配、范围匹配等数据过滤功能。
MysqlBolt.java
数据Mysql落地接口,将top处理的数据最终存入Mysql中。
PrintBolt.java
直接将数据打印输出。
2.4 storm.spout
//定义了各种数据接入的spout
MetaSpout.java
数据从Metaq(消息中间件)中接入。
ReadLogSpout.java
直接从本地读取Log,并且发布出去。
2.5 storm.xml
//从xml配置文件中读取配置文件
MonitorXml.java
从MonitorBolt.xml中读取配置信息。
MysqlXml.java
从MysqlBolt.xml中读取配置信息。
SpoutXml.java
从MetaSpout.xml中读取配置信息。
2.6 storm.source
GetSource.java
利用随机机制产生domain.log,即虚拟域名交易数据集,是一个主类,模拟构造最初始的数据集。
2.7 storm.helloworld
//storm的helloworld程序,即word count实例,有主类,是个单独的模块。
HelloWorldTopology.java
HelloWorld主类,启动类,注意wordcountbolt的分组方式,是以字段分组。
ReadFileSpout.java
读取数据spout,与ReadLogSpout相似。
WordNormalizerBolt.java
数据规范化类,将输入的domain记录拆分,并且只将域名拆分为三部分,例如www.XXX.com拆成www、XXX、com,此外包括seller四个词向后发布。
WordCountBolt.java
单词统计,利用HashMap的原理。
PrintWorldCountBolt.java
打印单词统计信息,更直观的看到统计结果。
3 项目总结
3.1 项目现状
实现了一个Top的基本流程;
提供了一个过滤操作的处理方式;
提供了Metaq数据接入接口;
提供了Mysql数据落地接口;
提供了storm的helloworld实例;
实现了拓扑的动态配置,即不重启拓扑修改配置生效;
3.2 项目发展
实现top的定制化,即可以根据需求配置即可达到,不用重新打包;
实现metaq的bolt接口;
实现HDFS落地接口; |