您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Modeler   Code  
会员   
 
   
 
 
     
   
 订阅
  捐助
Spark源码系列(八)Spark Streaming实例分析
 
作者 岑玉海的博客,火龙果软件    发布于 2014-11-12
   次浏览      
 

这一章要讲Spark Streaming,讲之前首先回顾下它的用法,具体用法请参照《Spark Streaming编程指南》。

Example代码分析

val ssc = new StreamingContext(sparkConf, Seconds(1));
// 获得一个DStream负责连接 监听端口:地址
val lines = ssc.socketTextStream(serverIP, serverPort);
// 对每一行数据执行Split操作
val words = lines.flatMap(_.split(" "));
// 统计word的数量
val pairs = words.map(word => (word, 1));
val wordCounts = pairs.reduceByKey(_ + _);
// 输出结果
wordCounts.print();
ssc.start(); // 开始
ssc.awaitTermination(); // 计算完毕退出

1、首先实例化一个StreamingContext

2、调用StreamingContext的socketTextStream

3、对获得的DStream进行处理

4、调用StreamingContext是start方法,然后等待

我们看StreamingContext的socketTextStream方法吧。

def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}

1、StoageLevel是StorageLevel.MEMORY_AND_DISK_SER_2

2、使用SocketReceiver的bytesToLines把输入流转换成可遍历的数据

继续看socketStream方法,它直接new了一个

new SocketInputDStream[T](this, hostname, port, converter, storageLevel)

继续深入挖掘SocketInputDStream,追述一下它的继承关系,SocketInputDStream>>ReceiverInputDStream>>InputDStream>>DStream。

具体实现ReceiverInputDStream的类有好几个,基本上都是从网络端来数据的。

它实现了ReceiverInputDStream的getReceiver方法,实例化了一个SocketReceiver来接收数据。

SocketReceiver的onStart方法里面调用了receive方法,处理代码如下:

 socket = new Socket(host, port)
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next)
}

1、new了一个Socket来接收数据,用bytesToLines方法把InputStream转换成一行一行的字符串。

2、把每一行数据用store方法保存起来,store方法是从SocketReceiver的父类Receiver继承而来,内部实现是:

def store(dataItem: T) {
executor.pushSingle(dataItem)
}

executor是ReceiverSupervisor类型,Receiver的操作都是由它来处理。这里先不深纠,后面我们再说这个pushSingle的实现。

到这里我们知道lines的类型是SocketInputDStream,然后对它是一顿的转换,flatMap、map、reduceByKey、print,这些方法都不是RDD的那种方法,而是DStream独有的。

讲到上面这几个方法,我们开始转入DStream了,flatMap、map、reduceByKey、print方法都涉及到DStream的转换,这和RDD的转换是类似的。我们讲一下reduceByKey和print。

reduceByKey方法和RDD一样,调用的combineByKey方法实现的,不一样的是它直接new了一个ShuffledDStream了,我们接着看一下它的实现吧。

override def compute(validTime: Time): Option[RDD[(K,C)]] = {
parent.getOrCompute(validTime) match {
case Some(rdd) => Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))
case None => None
}
}

在compute阶段,对通过Time获得的rdd进行reduceByKey操作。接下来的print方法也是一个转换:

new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()

打印前十个,超过10个打印"..."。需要注意register方法。

ssc.graph.addOutputStream(this)

它会把代码插入到当前的DStream添加到outputStreams里面,后面输出的时候如果没有outputStream就不会有输出,这个需要记住哦!

启动过程分析

前戏结束之后,ssc.start() 高潮开始了。 start方法很小,最核心的一句是JobScheduler的start方法。我们得转到JobScheduler方法上面去。

下面是start方法的代码:

def start(): Unit = synchronized {
  // 接受到JobSchedulerEvent就处理事件
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
def receive = {
case event: JobSchedulerEvent => processEvent(event)
}
}), "JobScheduler")

listenerBus.start()
receiverTracker = new ReceiverTracker(ssc)
receiverTracker.start()
jobGenerator.start()
}

1、启动了一个Actor来处理JobScheduler的JobStarted、JobCompleted、ErrorReported事件。

2、启动StreamingListenerBus作为监听器。

3、启动ReceiverTracker。

4、启动JobGenerator。

我们接下来看看ReceiverTracker的start方法。

def start() = synchronized {if (!receiverInputStreams.isEmpty) {
actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor), "ReceiverTracker")
receiverExecutor.start()
}
}

1、首先判断了一下receiverInputStreams不能为空,那receiverInputStreams是怎么时候写入值的呢?答案在SocketInputDStream的父类InputDStream当中,当实例化InputDStream的时候会在DStreamGraph里面添加InputStream。

abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) extends DStream[T](ssc_) {
ssc.graph.addInputStream(this)
//....
}

2、实例化ReceiverTrackerActor,它负责RegisterReceiver(注册Receiver)、AddBlock、ReportError(报告错误)、DeregisterReceiver(注销Receiver)等事件的处理。

3、启动receiverExecutor(实际类是ReceiverLauncher,这名字起得。。),它主要负责启动Receiver,start方法里面调用了startReceivers方法吧。

private def startReceivers() {
     // 对应着上面的那个例子,getReceiver方法获得是SocketReceiver
      val receivers = receiverInputStreams.map(nis => {
        val rcvr = nis.getReceiver()
        rcvr.setReceiverId(nis.id)
        rcvr
      })

      // 查看是否所有的receivers都有优先选择机器,
这个需要重写Receiver的preferredLocation方法,目前只有FlumeReceiver重写了
      val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)

      // 创建一个并行receiver集合的RDD, 把它们分散到各个worker节点上
      val tempRDD =
        if (hasLocationPreferences) {
          val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
          ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
        } else {
          ssc.sc.makeRDD(receivers, receivers.size)
        }

      // 在worker节点上启动Receiver的方法,遍历所有Receiver,然后启动
      val startReceiver = (iterator: Iterator[Receiver[_]]) => {
        if (!iterator.hasNext) {
          throw new SparkException("Could not start receiver as object not found.")
        }
        val receiver = iterator.next()
        val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
        executor.start()
        executor.awaitTermination()
      }
      // 运行这个重复的作业来确保所有的slave都已经注册了,避免所有的receivers都到一个节点上
      if (!ssc.sparkContext.isLocal) {
        ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
      }

      // 把receivers分发出去,启动
      ssc.sparkContext.runJob(tempRDD, startReceiver)
    }

1、遍历receiverInputStreams获取所有的Receiver。

2、查看这些Receiver是否全都有优先选择机器。

3、把SparkContext的makeRDD方法把所有Receiver包装到ParallelCollectionRDD里面,并行度是Receiver的数量。

4、发个小任务给确保所有的slave节点都已经注册了(这个小任务有点儿莫名其妙,感觉怪怪的)。

5、提交作业,启动所有Receiver。

Spark写得实在是太巧妙了,居然可以把Receiver包装在RDD里面,当做是数据来处理!

启动Receiver的时候,new了一个ReceiverSupervisorImpl,然后调的start方法,主要干了这么三件事情,代码就不贴了。

1、启动BlockGenerator。

2、调用Receiver的OnStart方法,开始接受数据,并把数据写入到ReceiverSupervisor。

3、调用onReceiverStart方法,发送RegisterReceiver消息给driver报告自己启动了。

保存接收到的数据

ok,到了这里,重点落到了BlockGenerator。前面说到SocketReceiver把接受到的数据调用ReceiverSupervisor的pushSingle方法保存。

// 这是ReceiverSupervisorImpl的方法
  def pushSingle(data: Any) {
    blockGenerator += (data)
  }
  // 这是BlockGenerator的方法
   def += (data: Any): Unit = synchronized {
    currentBuffer += data
  }

我们看一下它的start方法吧。

def start() {
blockIntervalTimer.start()
blockPushingThread.start()
}

它启动了一个定时器RecurringTimer和一个线程执行keepPushingBlocks方法。

先看RecurringTimer的实现:

while (!stopped) {
clock.waitTillTime(nextTime)
callback(nextTime)
prevTime = nextTime
nextTime += period
}

每隔一段时间就执行callback函数,callback函数是new的时候传进来的,是BlockGenerator的updateCurrentBuffer方法。

private def updateCurrentBuffer(time: Long): Unit = synchronized {
try {
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[Any]
if (newBlockBuffer.size > 0) {
val blockId = StreamBlockId(receiverId, time - blockInterval)
val newBlock = new Block(blockId, newBlockBuffer)
blocksForPushing.put(newBlock)
}
} catch {case t: Throwable =>
reportError("Error in block updating thread", t)
}
}

它new了一个Block出来,然后添加到blocksForPushing这个ArrayBlockingQueue队列当中。

提到这里,有两个参数需要大家注意的:

spark.streaming.blockInterval   默认值是200
spark.streaming.blockQueueSize 默认值是10

这是前面提到的间隔时间和队列的长度,间隔时间默认是200毫秒,队列是最多能容纳10个Block,多了就要阻塞了。

我们接下来看一下BlockGenerator另外启动的那个线程执行的keepPushingBlocks方法到底在干什么?

private def keepPushingBlocks() {
    while(!stopped) {
Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match {
case Some(block) => pushBlock(block)
case None =>
}
}
   // ...退出之前把剩下的也输出去了
}

它在把blocksForPushing中的block不停的拿出来,调用pushBlock方法,这个方法属于在实例化BlockGenerator的时候,从ReceiverSupervisorImpl传进来的BlockGeneratorListener的。

private val blockGenerator = new BlockGenerator(new BlockGeneratorListener {
    def onError(message: String, throwable: Throwable) {
      reportError(message, throwable)
    }

    def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
      pushArrayBuffer(arrayBuffer, None, Some(blockId))
    }
  }, streamId, env.conf)

1、reportError,通过actor向driver发送错误报告消息ReportError。

2、调用pushArrayBuffer保存数据。

下面是pushArrayBuffer方法:

def pushArrayBuffer(arrayBuffer: ArrayBuffer[_], optionalMetadata: Option[Any], optionalBlockId: Option[StreamBlockId]
) {
val blockId = optionalBlockId.getOrElse(nextBlockId)
val time = System.currentTimeMillis
blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], storageLevel, tellMaster = true)
reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
}

1、把Block保存到BlockManager当中,序列化方式为之前提到的StorageLevel.MEMORY_AND_DISK_SER_2(内存不够就写入到硬盘,并且在2个节点上保存的方式)。

2、调用reportPushedBlock给driver发送AddBlock消息,报告新添加的Block,ReceiverTracker收到消息之后更新内部的receivedBlockInfo映射关系。

处理接收到的数据

前面只讲了数据的接收和保存,那数据是怎么处理的呢?

之前一直讲ReceiverTracker,而忽略了之前的JobScheduler的start方法里面最后启动的JobGenerator。

def start(): Unit = synchronized {
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
def receive = {
case event: JobGeneratorEvent => processEvent(event)
}
}), "JobGenerator")
if (ssc.isCheckpointPresent) {
restart()
} else {
startFirstTime()
}
}

1、启动一个actor处理JobGeneratorEvent事件。

2、如果是已经有CheckPoint了,就接着上次的记录进行处理,否则就是第一次启动。

我们先看startFirstTime吧,CheckPoint以后再说吧,有点儿小复杂。

private def startFirstTime() {
val startTime = new Time(timer.getStartTime())
graph.start(startTime - graph.batchDuration)
timer.start(startTime.milliseconds)
}

1、timer.getStartTime计算出来下一个周期的到期时间,计算公式:(math.floor(clock.currentTime.toDouble / period) + 1).toLong * period,以当前的时间/除以间隔时间,再用math.floor求出它的上一个整数(即上一个周期的到期时间点),加上1,再乘以周期就等于下一个周期的到期时间。

2、启动DStreamGraph,启动时间=startTime - graph.batchDuration。

3、启动Timer,我们看看它的定义:

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator")

到这里就清楚了,DStreamGraph的间隔时间就是timer的间隔时间,启动时间要设置成比Timer早一个时间间隔,原因再慢慢探究。

可以看出来每隔一段时间,Timer给eventActor发送GenerateJobs消息,我们直接去看它的处理方法generateJobs吧,中间忽略了一步,大家自己看。

private def processEvent(event: JobGeneratorEvent) {
event match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time) => doCheckpoint(time)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}

下面是generateJobs方法。

private def generateJobs(time: Time) {
SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) =>
val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
val streamId = stream.id
val receivedBlockInfo = stream.getReceivedBlockInfo(time)
(streamId, receivedBlockInfo)
}.toMap
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventActor ! DoCheckpoint(time)
}

1、DStreamGraph生成jobs。

2、从stream那里获取接收到的Block信息。

3、调用submitJobSet方法提交作业。

4、提交完作业之后,做一个CheckPoint。

先看DStreamGraph是怎么生成的jobs。

 def generateJobs(time: Time): Seq[Job] = {
val jobs = this.synchronized {
outputStreams.flatMap(outputStream => outputStream.generateJob(time))
}
jobs
}

outputStreams在这个例子里面是print这个方法里面添加的,这个在前面说了,我们继续看DStream的generateJob。

private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
}
case None => None
}
}

1、调用getOrCompute方法获得RDD

2、new了一个方法去提交这个作业,缺什么都不做

为什么呢?这是直接跳转的错误,呵呵,因为这个outputStream是print方法返回的,它应该是ForEachDStream,所以我们应该看的是它里面的generateJob方法。

override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None => None
}
}

这里请大家千万要注意,不要在这块被卡住了。

我们看看它这个RDD是怎么出来的吧。

private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
    // If this DStream was not initialized (i.e., zeroTime not set), then do it
    // If RDD was already generated, then retrieve it from HashMap
    generatedRDDs.get(time) match {

      // 这个RDD已经被生成过了,直接用就是了
      case Some(oldRDD) => Some(oldRDD)

      // 还没生成过,就调用compte函数生成一个
      case None => {
        if (isTimeValid(time)) {
          compute(time) match {
            case Some(newRDD) =>
         // 设置保存的级别
              if (storageLevel != StorageLevel.NONE) {
                newRDD.persist(storageLevel)
              }
         // 如果现在需要,就做CheckPoint
              if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
                newRDD.checkpoint()
              }
         // 添加到generatedRDDs里面去,可以再次利用
              generatedRDDs.put(time, newRDD)
              Some(newRDD)
            case None =>
              None
          }
        } else {
          None
        }
      }
    }
  }

从上面的方法可以看出来它是通过每个DStream自己实现的compute函数得出来的RDD。我们找到SocketInputDStream,没有compute函数,在父类ReceiverInputDStream里面找到了。

override def compute(validTime: Time): Option[RDD[T]] = {
// 如果出现了时间比startTime早的话,就返回一个空的RDD,因为这个很可能是master挂了之后的错误恢复
if (validTime >= graph.startTime) {
val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
receivedBlockInfo(validTime) = blockInfo
val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
Some(new BlockRDD[T](ssc.sc, blockIds))
} else {
Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
}
}

通过DStream的id把receiverTracker当中把接收到的block信息全部拿出来,记录到ReceiverInputDStream自身的receivedBlockInfo这个HashMap里面,就把RDD返回了,RDD里面实际包含的是Block的id的集合。

现在我们就可以回到之前JobGenerator的generateJobs方法,我们就清楚它这句是提交的什么了。

jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))

JobSet是记录Job的完成情况的,直接看submitJobSet方法吧。

def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
    } else {
      jobSets.put(jobSet.time, jobSet)
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
    }
  }

遍历jobSet里面的所有jobs,通过jobExecutor这个线程池提交。我们看一下JobHandler就知道了。

private class JobHandler(job: Job) extends Runnable {
def run() {
eventActor ! JobStarted(job)
job.run()
eventActor ! JobCompleted(job)
}
}

1、通知eventActor处理JobStarted事件。

2、运行job。

3、通知eventActor处理JobCompleted事件。

这里的重点是job.run,事件处理只是更新相关的job信息。

def run() {
result = Try(func())
}

在遍历BlockRDD的时候,在compute函数获取该Block(详细请看BlockRDD),然后对这个RDD的结果进行打印。

到这里就算结束了,最后来个总结吧,这一章只是过程分析:

1、可以有多个输入,我们可以通过StreamingContext定义多个输入,比如我们监听多个(host,ip),可以给它们定义各自的处理逻辑和输出,输出方式不仅限于print方法,还可以有别的方法,saveAsTextFiles和saveAsObjectFiles。这块的设计是支持共享StreamingContext的。

2、StreamingContext启动了JobScheduler,JobScheduler启动ReceiverTracker和JobGenerator。

3、ReceiverTracker是通过把Receiver包装成RDD的方式,发送到Executor端运行起来的,Receiver起来之后向ReceiverTracker发送RegisterReceiver消息。

3、Receiver把接收到的数据,通过ReceiverSupervisor保存。

4、ReceiverSupervisorImpl把数据写入到BlockGenerator的一个ArrayBuffer当中。

5、BlockGenerator内部每个一段时间(默认是200毫秒)就把这个ArrayBuffer构造成Block添加到blocksForPushing当中。

6、BlockGenerator的另外一条线程则不断的把加入到blocksForPushing当中的Block写入到BlockManager当中,并向ReceiverTracker发送AddBlock消息。

7、JobGenerator内部有个定时器,定期生成Job,通过DStream的id,把ReceiverTracker接收到的Block信息从BlockManager上抓取下来进行处理,这个间隔时间是我们在实例化StreamingContext的时候传进去的那个时间,在这个例子里面是Seconds(1)。

   
次浏览       
相关文章

基于EA的数据库建模
数据流建模(EA指南)
“数据湖”:概念、特征、架构与案例
在线商城数据库系统设计 思路+效果
 
相关文档

Greenplum数据库基础培训
MySQL5.1性能优化方案
某电商数据中台架构实践
MySQL高扩展架构设计
相关课程

数据治理、数据架构及数据标准
MongoDB实战课程
并发、大容量、高性能数据库设计与优化
PostgreSQL数据库实战培训
最新活动计划
LLM大模型应用与项目构建 12-26[特惠]
QT应用开发 11-21[线上]
C++高级编程 11-27[北京]
业务建模&领域驱动设计 11-15[北京]
用户研究与用户建模 11-21[北京]
SysML和EA进行系统设计建模 11-28[北京]

MySQL索引背后的数据结构
MySQL性能调优与架构设计
SQL Server数据库备份与恢复
让数据库飞起来 10大DB2优化
oracle的临时表空间写满磁盘
数据库的跨平台设计
更多...   


并发、大容量、高性能数据库
高级数据库架构设计师
Hadoop原理与实践
Oracle 数据仓库
数据仓库和数据挖掘
Oracle数据库开发与管理


GE 区块链技术与实现培训
航天科工某子公司 Nodejs高级应用开发
中盛益华 卓越管理者必须具备的五项能力
某信息技术公司 Python培训
某博彩IT系统厂商 易用性测试与评估
中国邮储银行 测试成熟度模型集成(TMMI)
中物院 产品经理与产品管理
更多...