Spark
Streaming应用与实战系列包括以下六部分内容:
1.背景与架构改造
2.通过代码实现具体细节,并运行项目
3.对Streaming监控的介绍以及解决实际问题
4.对项目做压测与相关的优化
5.Streaming持续优化之HBase
6.管理Streaming任务
本篇为第一部分,包括背景与架构改造、通过代码实现具体细节并运行项目、对Streaming监控的介绍以及解决实际问题、对项目做压测与相关的优化。
一、背景与架构改造
1.1 问题描述
有一块业务主要是做爬虫抓取与数据输出,通过大数据这边提供的SOA服务入库到HBase,架构大致如下:
架构改造之前
以对于以上的架构存在一些问题,我们可以看见数据在Dubbox服务阶段处理后直接通过HBase API入库了HBase,中间并没做任何缓冲,要是HBase出现了问题整个集群都完蛋,没法写入数据,数据还丢失,HBase这边压力也相当大,针对这一点,对入库HBase这个阶段做了一些改造。
1.2 架构改造
改造后的架构,爬虫通过接口服务,入库到Kafka,Spark streaming去消费kafka的数据,入库到HBase.核心组件如下图所示:
架构改造图
为什么不直接入库到HBase,这样做有什么好处?
1.缓解了HBase这边峰值的压力,并且流量可控;
2.HBase集群出现问题或者挂掉,都不会照成数据丢失的问题;
3.增加了吞吐量。
1.3 为什么选择Kafka和Spark streaming
1.由于Kafka它简单的架构以及出色的吞吐量;
2.Kafka与Spark streaming也有专门的集成模块;
3.Spark的容错,以及现在技术相当的成熟。
二、通过代码实现具体细节,并运行项目
然后就开始写代码了,总体思路就是:
1.put数据构造json数据,写入Kafka;
2.Spark Streaming任务启动后首先去Zookeeper中去读取offset,组装成fromOffsets;
3.Spark Streaming 获取到fromOffsets后通过KafkaUtils.createDirectStream去消费Kafka的数据;
4.读取Kafka数据返回一个InputDStream的信息,foreachRDD遍历,同时记录读取到的offset到zk中;
5.写入数据到HBase。
详细一点的架构图
2.1 初始化与配置加载
下面是一些接收参数,加载配置,获取配置中的topic,还有初始化配置,代码如下:
//接收参数
val Array(kafka_topic, timeWindow, maxRatePerPartition)
= args
//加载配置
val prop: Properties = new Properties()
prop.load(this.getClass().getResourceAsStream
("/kafka.properties"))
val groupName = prop.getProperty("group.id")
//获取配置文件中的topic
val kafkaTopics: String = prop.getProperty("kafka.topic."
+ kafka_topic)
if (kafkaTopics == null || kafkaTopics.length
<= 0) {
System.err.println("Usage: KafkaDataStream
<kafka_topic>
is number from kafka.properties")
System.exit(1)
}
val topics: Set[String] = kafkaTopics.split(",").toSet
val kafkaParams = scala.collection.immutable.Map[String,
String](
"metadata.broker.list" -> prop.getProperty("bootstrap.servers"),
"group.id" -> groupName,
"auto.offset.reset" -> "largest")
val kc = new KafkaCluster(kafkaParams)
//初始化配置
val sparkConf = new SparkConf()
.setAppName(KafkaDataStream.getClass.getSimpleName
+ topics.
toString())
.set("spark.yarn.am.memory", prop.getProperty("am.memory"))
.set("spark.yarn.am.memoryOverhead",
prop.getProperty
("am.memoryOverhead"))
.set("spark.yarn.executor.memoryOverhead",
prop.getProperty
("executor.memoryOverhead"))
.set("spark.streaming.kafka.maxRatePerPartition",
maxRatePerPartition) //此处为每秒每个partition的条数
.set("spark.serializer", "org.apache.spark.serializer
.KryoSerializer")
.set("spark.reducer.maxSizeInFlight",
"1m")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(timeWindow.toInt))
//多少秒处理一次请求 |
只是需要注意一下,这里的KafkaCluster,需要把源码拷贝过来,修改一下,因为里面有些方法是私有的。copy过来后改为public即可。
2.2 链接ZK
注意:这里的ZKStringSerializer,需要把源码拷贝过来,修改一下。
//zk
val zkClient = new ZkClient(prop.getProperty("zk.connect")
, Integer.MAX_VALUE,
100000, ZKStringSerializer)
val messageHandler = (mmd: MessageAndMetadata[String,
String]) =>
(mmd.topic, mmd.message()) |
2.3 组装fromOffsets
组装fromOffsets,createDirectStream接收的是一个map的结构,所以可以支持多个topic的消费。
var
fromOffsets: Map[TopicAndPartition, Long] =
Map()
//多个partition的offset
//支持多个topic : Set[String]
topics.foreach(topicName => {
//去brokers中获取partition数量,注意:新增partition后需要重启
val children = zkClient.countChildren(ZkUtils.getTopicPar
titionsPath(topicName))
for (i <- 0 until children) {
//kafka consumer 中是否有该partition的消费记录,如果没有设置为0
val tp = TopicAndPartition(topicName, i)
val path: String = s"${new ZKGroupTopicDirs(groupName,
topicName).consumerOffsetDir}/$i"
if (zkClient.exists(path)) {
fromOffsets += (tp -> zkClient.readData[String](path).toLong)
} else {
fromOffsets += (tp -> 0)
}
}
}) |
2.4 通过createDirectStream接受数据
使用KafkaUtils里面的createDirectStream方法去消费kafka数据,createDirectStream使用的是kafka简单的Consumer
API,所以需要自己去管理offset,我们把offset写入到zk中,这样也方便了一些监控软件读取记录。
//创建Kafka持续读取流,通过zk中记录的offset
val messages: InputDStream[(String, String)]
=
KafkaUtils.createDirectStream[String, String,
StringDecoder,
StringDecoder, (String,
String)](ssc, kafkaParams, fromOffsets,
messageHandler) |
2.5 入库
入库HBase:
//数据操作
messages.foreachRDD(rdd => {
val offsetsList: Array[OffsetRange] = rdd.asInstance
Of[HasOffsetRanges].offsetRanges
//data 处理
rdd.foreachPartition(partitionRecords =>
{
//TaskContext 上下文
val offsetRange: OffsetRange = offsetsList(TaskContext
.get.partitionId)
logger.info(s"${offsetRange.topic} ${offsetRange.partit
ion} ${offsetRange.fromOffset} ${offsetRange.untilOffset}")
//TopicAndPartition 主构造参数第一个是topic,第二个是Kafka
partition id
val topicAndPartition = TopicAndPartition(offsetRange.topic,
offsetRange.partition)
val either = kc.setConsumerOffsets(groupName,
Map((topicAnd
Partition, offsetRange.untilOffset))) //是
if (either.isLeft) {
logger.info(s"Error updating the offset
to Kafka cluster:
${either.left.get}")
}
partitionRecords.foreach(data => {
HBaseDao.insert(data)
})
})
}) |
插入数据到具体HBase数据库:
/**
*
* 插入数据到 HBase
*
* 参数( tableName , json ) ):
*
* Json格式:
* {
* "rowKey": "00000-0",
* "family:qualifier": "value",
* "family:qualifier": "value",
* ......
* }
*
* @param data
* @return
*/
def insert(data: (String, String)): Boolean
= {
val t: HTable = getTable(data._1) //HTable
try {
val map: mutable.HashMap[String, Object] =
JsonUtils.json2Map(data._2)
val rowKey: Array[Byte] = String.valueOf(map.
get("rowKey")).getBytes //rowKey
val put = new Put(rowKey)
for ((k, v) <- map) {
val keys: Array[String] = k.split(":")
if (keys.length == 2){
put.addColumn(keys(0).getBytes, keys(1).getBytes
, String.valueOf(v).getBytes)
}
}
Try(t.put(put)).getOrElse(t.close())
true
} catch {
case e: Exception =>
e.printStackTrace()
false
}
} |
2.6 运行并查看结果
运行命令:
/opt/cloudera/parcels/CDH/bin/spark-submit
--master
yarn-client --class
com.xiaoxiaomo.streaming.KafkaData
Stream hspark-1.0.jar
1 3 1000
|
运行后可以去spark UI中去查看相关运行情况,UI中具体细节见下文。
Streaming Statistics数据统计图
Completed Batches
三、对Streaming监控的介绍以及解决实际问题
这部分主要在代码运行起来的情况下来看一下任务的运行情况主要是streaming的监控界面,以及我们怎么去通过监控界面发现问题和解决问题。
3.1 监控
官网中指出,spark中专门为SparkStreaming程序的监控设置了额外的途径,当使用StreamingContext时,在WEB
UI中会出现一个”Streaming”的选项卡:
WEB UI中的“Streaming”选项卡
在此选项卡内,统计的内容展示如下:
Streaming 状态图
Spark streaming 处理速度为3s一次,每次1000条。
Kafka product 每秒1000条数据,与上面spark consumer消费者恰好相等。结果:数据量大导致积压,这个过程中active
Batches会越变越大。
因为忽略了实际的Processing time:
Active Batches
Completed Batches
Streaming Batches对应的趋势图
这其中包括接受的记录数量,每一个batch内处理的记录数,处理时间,以及总共消耗的时间。在上述参数之中最重要的两个参数分别是Porcessing
Time 以及 Scheduling Delay:
1.Porcessing Time用来统计每个batch内处理数据所消费的时间
2.Scheduling Delay用来统计在等待被处理所消费的时间
如果PT比SD大,或者SD持续上升,这就表明此系统不能对产生的数据实时响应,换句话来说就是,出现了处理时延,每个batch
time 内的处理速度小于数据的产生速度。
在这种情况下,读者需要想法减少数据的处理速度,即需要提升处理效率。
3.2 问题发现
在我做压测的时候, Spark streaming 处理速度为3s一次,每次1000条。
Kafka product每秒1000条数据, 与上面spark consumer消费者恰好相等。于是就会数据量大导致积压,这个过程中active
Batches会越变越大。最后发现了一个问题:
Streaming Batches对应的趋势图
当压测峰值过后Input Size=0 events,时间仍然不减,奇怪!
Streaming Batches一些异常情况图
查看摸个具体stage:
Streaming具体的stage信息
从图中, 我们可以看到Spark总共调度分发了两批次task set, 每个task set的处理(含序列化和压缩之类的工作)都不超过100毫秒,那么该Stage何来消耗4s呢?慢着,貌似这两批次的task
set分发的时间相隔得有点长啊,隔了4秒左右。为什么会隔这么就才调度一次呢?
此处要引入一个配置项“spark.locality.wait”(默认等待3s),它配置了本地化调度降级所需要的时间。这里概要补充下Spark本地化调度的知识,Spark的task一般都会分发到它所需数据的那个节点,这称之为”NODE_LOCAL”,但在资源不足的情况下,数据所在节点未必有资源处理task,因此Spark在等待了“spark.locality.wait”所配置的时间长度后,会退而求其次,分发到数据所在节点的同一个机架的其它节点上,这是“RACK_LOCAL”。当然,也有更惨的,就是再等了一段“spark.locality.wait”的时间长度后,干脆随便找一台机器去跑task,这就是“ANY”策略了。
Streaming 源码
而从上例看到, 即使用最差的”ANY”策略进行调度,task set的处理也只是花了100毫秒,因此,没必要非得为了”NODE_LOCAL”策略的生效而去等待那么长的时间,特别是在流计算这种场景上。所以把“spark.locality.wait”果断调小,从1秒到500毫秒,最后干脆调到100毫秒算了。
spark-submit
–master yarn-client –conf spark.driver.memory=256m
–class com.KafkaDataStream
–num-executors 1 –executor-memory
256m –executor-cores
2 –conf spark.locality.wait=100ms hspark-1.0.jar |
调了之后的处理时间为0.7s:
Streaming Completed
Batches正常
具体耗时如下:
Streaming 具体耗时信息图
四、对项目做压测与相关的优化
对项目做压测与相关的优化,主要从内存(executor-memory和driver-memory)、num-executors、executor-cores,以及代码层面做一些测试和改造。
4.1 压测
spark-submit
–master yarn-client –conf spark.
driver.memory=256m
–class com.xiaoxiaomo.KafkaDataStream
–num-executors 1
–executor-memory 256m –executor-cores 2
–conf spark.locality.wait=100ms
hspark.jar 3 1000 |
Spark streaming 处理速度为3s一次,每次1000条。
Kafka product 每秒1000条数据, 与上面spark consumer消费者恰好相等。结果:数据量大导致积压,这个过程中active
Batches会越变越大。
调整Kafka product 每秒600条数据,存在积压,但已经不严重:
Kafka product 每秒600条数据,存在积压
调整Kafka product 每秒500条数据,为消费者50%,测试结果显示正常,等待时间很稳定:
Kafka product 每秒500条数据,正常
但是。此时每秒吞吐量为500显然不够,通过调整间歇实际等,发现并没有变化:
spark-submit
–master yarn-client –conf spark.driver.memory=256m
–class com.xiaoxiaomo.KafkaDataStream –num-executors
1 –executor-memory 256m –executor-cores 2 –conf
spark.locality.wait=100ms hspark.jar 2 2000
Spark streaming 处理速度为2s一次,每次2000条 |
Kafka product 每秒500条数据,可以看见没有在指定时间内消费完数据,照成数据积压,并发下降了。
Kafka product 每秒500条数据,没有在指定时间内消费完
4.2 分析原因
分析原因,发现大部分耗时都在处理数据这样一阶段,如下图所示:
Streaming 时间分析图
4.3 调整参数
调整 executor-cores:
1.executor-cores 2 并发上升至700/s
2.executor-cores 3 并发上升至750/s
调整executor-cores后
1.调整executor内存,并发没有增长,无效:
executor-memory
512m
conf spark.yarn.executor.memoryOverhead=512 |
2.调整am内存,并发没有增长,无效:
am-memory
512m
conf spark.yarn.am.memoryOverhead=512 |
4.4 代码调整
发现现在主要还是在处理数据的时候消耗时间一直没有减少,而处理数据查看后发现是一条一条的往HBase里面插入的,修改为批量插入,重新构建了json.性能猛增!!修改前的代码:
/**
*
* 插入数据到 HBase
*
* 参数( tableName , json ) ):
*
* Json格式:
* {
* "rowKey": "00000-0",
* "family:qualifier": "value",
* "family:qualifier": "value",
* ......
* }
*
* @param data
* @return
*/
def insert(data: (String, String)): Boolean
= {
val t: HTable = getTable(data._1) //HTable
try {
val map: mutable.HashMap[String, Object] =
JsonUtils.json2Map(data._2)
val rowKey: Array[Byte] = String.valueOf(map
.get("rowKey")).getBytes
//rowKey
val put = new Put(rowKey)
for ((k, v) <- map) {
val keys: Array[String] = k.split(":")
if (keys.length == 2){
put.addColumn(keys(0).getBytes, keys(1).getBytes,
String.valueOf(v).getBytes)
}
}
Try(t.put(put)).getOrElse(t.close())
true
} catch {
case e: Exception =>
e.printStackTrace()
false
}
} |
修改后的代码:
//数据操作
messages.foreachRDD(rdd => {
val offsetsList: Array[OffsetRange] = rdd.asInstanceOf
[HasOffsetRanges].offsetRanges
//data 处理
rdd.foreachPartition(partitionRecords =>
{
//TaskContext 上下文
val offsetRange: OffsetRange = offsetsList(TaskContext
.get.partitionId)
logger.debug(s"${offsetRange.topic} ${offsetRange.partition}
${offsetRange.fromOffset}
${offsetRange.untilOffset}")
//TopicAndPartition 主构造参数第一个是topic,第二个是Kafka
partition id
val topicAndPartition = TopicAndPartition(offsetRange.topic,
offsetRange.partition)
val either = kc.setConsumerOffsets(groupName,
Map((topicAndPartition,
offsetRange.untilOffset))) //是
if (either.isLeft) {
logger.info(s"Error updating the offset
to Kafka cluster:
${either.left.get}")
}
/** 解析PartitionRecords数据 */
if (offsetRange.topic != null) {
HBaseDao.insert(offsetRange.topic, partitionRecords)
}
})
}) |
插入数据到HBase:
/**
*
* 插入数据到 HBase
*
* 参数( tableName , [( tableName , json )] ):
*
* Json格式:
* {
* "r": "00000-0",
* "f": "family",
* "q": [
* "qualifier",
* "qualifier"
* ...
* ],
* "v": [
* "value",
* "value"
* ...
* ],
* }
*
* @return
*/
def insert(tableName: String, array: Iterator
[(String, String)]):
Boolean = {
try {
/** 操作数据表 && 操作索引表 */
val t: HTable = getTable(tableName) //HTable
val puts: util.ArrayList[Put] = new util.ArrayList[Put]()
/** 遍历Json数组 */
array.foreach(json => {
val jsonObj: JSONObject = JSON.parseObject(json._2)
val rowKey: Array[Byte] = jsonObj.getString("r").getBytes
val family: Array[Byte] = jsonObj.getString("f").getBytes
val qualifiers: JSONArray = jsonObj.getJSONArray("q")
val values: JSONArray = jsonObj.getJSONArray("v")
val put = new Put(rowKey)
for (i <- 0 until qualifiers.size()) {
put.addColumn(family, qualifiers.getString(i).
getBytes, values.getString(i).getBytes)
}
puts.add(put)
})
Try(t.put(puts)).getOrElse(t.close())
true
} catch {
case e: Exception =>
e.printStackTrace()
logger.error(s"insert ${tableName} error
", e)
false
}
} |
4.5 运行
刚测试时给它相对很小的内存跑一跑:
[root@xiaoxiaomo.com ~]# /opt/cloudera/parcels
/CDH/bin/spark-submit
\
--master yarn-client --num-executors 1 \
--driver-memory 256m --conf spark.yarn.driver.
memoryOverhead=256
\
--conf spark.yarn.am.memory=256m --conf spark.yarn
.am.memoryOverhead=256
\
--executor-memory 256m --conf spark.yarn.executor
memoryOverhead=256
\
--executor-cores 1 \
--class com.creditease.streaming.KafkaDataStream
hspark-1.0.jar 1
3 30000 |
五六万的插入没什么压力,但是到10万的时候,就有些卡顿了!!
yarn 容器、cpu、内存大小
五六万的插入没什么压力
当然是需要增大内存的,修改配置,都增加一倍:
[root@xiaoxiaomo.com ~]# /opt/cloudera/parcels/
CDH/bin/spark-submit
\
--master yarn-client --num-executors 2 \
--driver-memory 512m --conf spark.yarn.driver
.memoryOverhead=512
\
--conf spark.yarn.am.memory=512m --conf spark.
yarn.am.memoryOverhead=512
\
--executor-memory 512m --conf spark.yarn.executor
.memoryOverhead=512
\
--executor-cores 1 \
--class com.creditease.streaming.KafkaDataStream
hspark-1.0.jar 1
3 30000 |
yarn 容器、cpu、内存大小
90000的插入没什么压力
查看插入数据量,能看到修改后插入数据10万是没有什么压力的:
查看插入数据量,能看到修改后插入数据10万是没有什么压力的
当我们再继续加大压力测试的时候,性能下降:
当我们再继续加大压力测试的时候,性能下降
查看统计信息:
查看统计信息 |