最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

Spark源码解析:DStream

[复制链接]
跳转到指定楼层
楼主
发表于 2017-6-13 08:24:19 | 只看该作者 |只看大图 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
作者:dantezhao
原文出处

0x00 前言
本篇是Spark源码解析的第二篇,主要通过源码分析Spark Streaming设计中最重要的一个概念——DStream。
本篇主要来分析Spark Streaming中的Dstream,重要性不必多讲,明白了Spark这个几个数据结构,容易对Spark有一个整体的把握。
和RDD那篇文章类似,虽说是分析Dstream,但是整篇文章会围绕着一个具体的例子来展开。算是对Spark Streaming源码的一个概览。
文章结构
  • Spark Streaming的一些概念,主要和Dstream相关
  • Dstream的整体设计
  • 通过一个具体例子深入讲解
0x01 概念什么是Spark Streaming
Scalable, high-throughput, fault-tolerant stream processing of live data streams!
一个实时系统,或者说是准实时系统。详细不再描述。
提一点就是,Streaming 的任务最后都会转化为Spark任务,由Spark引擎来执行。
Dstream
It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream.

RDD 的定义是一个只读、分区的数据集(an RDD is a read-only, partitioned collection of records),而 DStream 又是 RDD 的模板,所以我们把 Dstream 也视同数据集。
我的简单理解,Dstream是在RDD上面又封了一层的数据结构。下面是官网对Dstream描述的图。
Spark Streaming和其它实时处理程序的区别
此处是来自Spark作者的论文,写的很好,我就不翻译了,摘出来我关注的点。
我们把实时处理框架分为两种:Record-at-a-time和D-Stream processing model。
Record-at-a-time:
D-Stream processing model:
两者的区别:
Record-at-a-time processing model. Each node continuously receives records, updates internal state, and sends new records. Fault tolerance is typically achieved through replication, using a synchronization protocol like Flux or DPC to ensure that replicas of each node see records in the same order (e.g., when they have multiple parent nodes).
D-Stream processing model. In each time interval, the records that arrive are stored reliably across the cluster to form an immutable, partitioned dataset. This is then processed via deterministic parallel operations to compute other distributed datasets that represent program output or state to pass to the next interval. Each series of datasets forms one D-Stream.
Record-at-a-time的问题:
In a record-at-a-time system, the major recovery challenge is rebuilding the state of a lost, or slow, node.
0x02 源码分析Dstream
A DStream internally is characterized by a few basic properties:
  • A list of other DStreams that the DStream depends on
  • A time interval at which the DStream generates an RDD
  • A function that is used to generate an RDD after each time interval
Dstream这个数据结构有三块比较重要。
  • 父依赖
  • 生成RDD的时间间隔
  • 一个生成RDD的function
这些对应到代码中的话如下,这些都会有具体的子类来实现,我们在后面的分析中就能看到。 下面先顺着例子一点点讲。
[AppleScript] 纯文本查看 复制代码
abstract class DStream[T: ClassTag] ( @transient private[streaming] var ssc: StreamingContext ) extends Serializable with Logging {
  /** Time interval after which the DStream generates an RDD */
  def slideDuration: Duration
  /** List of parent DStreams on which this DStream depends on */
  def dependencies: List[DStream[_]]
  /** Method that generates an RDD for the given time */
  def compute(validTime: Time): Option[RDD[T]]
  // RDDs generated, marked as private[streaming] so that testsuites can access it
  @transient
  private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]()
  // Reference to whole DStream graph
  private[streaming] var graph: DStreamGraph = null
 }



举个栗子
官网最基本的wordcount例子,和Spark的类似。虽简单,但是代表性很强。
[AppleScript] 纯文本查看 复制代码
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start()             // Start the computation
ssc.awaitTermination()  //


这里涉及到了Dstream之间的转换以及RDD的生成。在这里先看一下Dstream的转换。
Dstream依赖关系
Dstream的一些依赖关系还是要先弄明白的,不然不太容易理解。Dstream依赖图很大,我们只列几个这次关注的。
这里不再详细介绍每一个组件,只放一个图,后面在看源码的时候可以回过头再看,会更清晰。

1. 源码分析:StreamingContext类

StreamingContext的主要组成,这里我们不再展开讲StreamingContext的作用,我们先讲这个具体的例子,后面会有专门的博客来分析其中一些主要的组件,比如DstreamGraph和JobGenerator。
  • JobScheduler : 用于定期生成Spark Job
  • JobGenerator
  • JobExecutor
  • DstreamGraph:包含Dstream之间依赖关系的容器
  • StreamingJobProgressListener:监听Streaming Job,更新StreamingTab
  • StreamingTab:Streaming Job的标签页
  • SparkUI负责展示

[AppleScript] 纯文本查看 复制代码
class StreamingContext private[streaming] (
    _sc: SparkContext,
    _cp: Checkpoint,
    _batchDur: Duration
  ) extends Logging {...}



先看第一行代码做了什么,val lines = ssc.socketTextStream("localhost", 9999),看过RDD源码的应该会记得,这一行代码就会做很多Dstream的转换,下面我们慢慢看。
socketTextStream 返回的时一个SocketInputDStream,那么SocketInputDStream是个什么东西?
[AppleScript] 纯文本查看 复制代码
def socketTextStream(
      hostname: String,
      port: Int,
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
    socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
  }

  def socketStream[T: ClassTag](
      hostname: String,
      port: Int,
      converter: (InputStream) => Iterator[T],
      storageLevel: StorageLevel
    ): ReceiverInputDStream[T] = {
    new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
  }


2. 源码分析:SocketInputDStream类


这里我们看到SocketInputDStream其实继承了ReceiverInputDStream,这里就出现了第一层的继承关系,可以回头看一下前面的那个图。
它里面没做太多的东西,主要自己写了一个SocketReceiver,其余的主要方法都继承自ReceiverInputDStream。
[AppleScript] 纯文本查看 复制代码
class SocketInputDStream[T: ClassTag](
    _ssc: StreamingContext,
    host: String,
    port: Int,
    bytesToObjects: InputStream => Iterator[T],
    storageLevel: StorageLevel
  ) extends ReceiverInputDStream[T](_ssc) {

  def getReceiver(): Receiver[T] = {
    new SocketReceiver(host, port, bytesToObjects, storageLevel)
  }
}



3. 源码分析:ReceiverInputDStream类


ReceiverInputDStream是一个比较重要的类,有很大一部分的Dstream都继承于它。 比如说Kafka的InputDStream。所以说这是一个比较关键的类。
Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] that has to start a receiver on worker nodes to receive external data. Specific implementations of ReceiverInputDStream must define [[getReceiver]] function that gets the receiver object of type [[org.apache.spark.streaming.receiver.Receiver]] that will be sent to the workers to receive data.
注意: 这里重写了一个重要的方法compute。它决定了如何生成RDD。
另外ReceiverInputDStream继承自InputDStream。


[AppleScript] 纯文本查看 复制代码
abstract class ReceiverInputDStream[T: ClassTag](_ssc: StreamingContext)
  extends InputDStream[T](_ssc) {
  /**
   * Generates RDDs with blocks received by the receiver of this stream. */
  override def compute(validTime: Time): Option[RDD[T]] = {
    val blockRDD = {

      if (validTime < graph.startTime) {
        // If this is called for any time before the start time of the context,
        // then this returns an empty RDD. This may happen when recovering from a
        // driver failure without any write ahead log to recover pre-failure data.
        new BlockRDD[T](ssc.sc, Array.empty)
      } else {
        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
        // for this batch
        val receiverTracker = ssc.scheduler.receiverTracker
        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

        // Register the input blocks information into InputInfoTracker
        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

        // Create the BlockRDD
        createBlockRDD(validTime, blockInfos)
      }
    }
    Some(blockRDD)
  }



4. 源码分析:InputDStream类


InputDStream是一个比较重要的抽象,它是所有和Input相关Dstream的抽象类。比如FileInputDStream和我们刚才看的ReceiverInputDStream。
This is the abstract base class for all input streams. This class provides methods start() and stop() which are called by Spark Streaming system to start and stop receiving data, respectively.
Input streams that can generate RDDs from new data by running a service/thread only on the driver node (that is, without running a receiver on worker nodes), can be implemented by directly inheriting this InputDStream.
For example, FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for new files and generates RDDs with the new files.
For implementing input streams that requires running a receiver on the worker nodes, use [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] as the parent class.

[AppleScript] 纯文本查看 复制代码
abstract class InputDStream[T: ClassTag](_ssc: StreamingContext) extends DStream[T](_ssc) {
override def dependencies: List[DStream[_]] = List()

  override def slideDuration: Duration = {
    if (ssc == null) throw new Exception("ssc is null")
    if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null")
    ssc.graph.batchDuration
  }
...
}


注意: 到这里,才看完了第一行代码,就是那个读数据的那一行。

5. 源码分析:Dstream.flatMap方法(以及Dstream如何生成RDD)


Dstream前面已经做过了一些介绍,不再赘述,这里开始按照例子的顺序向下讲。
看我们的第一个转换flatMap。返回了个FlatMappedDStream,并传入一个function。

[AppleScript] 纯文本查看 复制代码
def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {
    new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
  }


下面转到FlatMappedDStream的分析,里面会设计到如何生存RDD的操作。

[AppleScript] 纯文本查看 复制代码
class FlatMappedDStream[T: ClassTag, U: ClassTag](
    parent: DStream[T],
    flatMapFunc: T => TraversableOnce[U]
  ) extends DStream[U](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)
  override def slideDuration: Duration = parent.slideDuration
  override def compute(validTime: Time): Option[RDD[U]] = {
    parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
  }
}


DStream如何生成RDD?
Get the RDD corresponding to the given time; either retrieve it from cache or compute-and-cache it.
DStream 内部用一个类型是 HashMap 的变量 generatedRDD 来记录已经生成过的 RDD。
注意: compute(time)是用来生成rdd的。


[AppleScript] 纯文本查看 复制代码
  private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
    / 从 generatedRDDs 里 来取rdd:如果有 rdd 就返回,没有 rdd 就进行 orElse 的代码
    generatedRDDs.get(time).orElse {
      // Compute the RDD if time is valid (e.g. correct time in a sliding window)
      // of RDD generation, else generate nothing.
      // 验证time是否valid
      if (isTimeValid(time)) {
        // 此处调用 compute(time) 方法获得 rdd 实例,并存入 rddOption 变量
        val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details. We need to have this call here because
          // compute() might cause Spark jobs to be launched.
          // 这个函数在RDD的代码里面,看了一下不是很理解,只能通过注释知道大概意思是不检查输出目录。
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
            compute(time)
          }
        }

        rddOption.foreach { case newRDD =>
          // Register the generated RDD for caching and checkpointing
          if (storageLevel != StorageLevel.NONE) {
            newRDD.persist(storageLevel)
            logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
          }
          if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
            newRDD.checkpoint()
            logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
          }
          // 将刚刚实例化出来的 rddOption 放入 generatedRDDs 对应的 time 位置
          generatedRDDs.put(time, newRDD)
        }
        rddOption
      } else {
        None
      }
    }
  }

  
6. 源码分析:Dstream.map方法


[AppleScript] 纯文本查看 复制代码
/** Return a new DStream by applying a function to all elements of this DStream. */
  def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {
    new MappedDStream(this, context.sparkContext.clean(mapFunc))
  }


此处值得说明一下,看compute函数parent.getOrCompute(validTime).map(_.map[U](mapFunc)),在这里同样调用了Dstream的getOrCompute函数,由于validTime已经存在,因此不重新生成RDD,而是从generatedRDDs中取出来。
然后再执行.map(_.map[U](mapFunc))这部分。

[AppleScript] 纯文本查看 复制代码
class MappedDStream[T: ClassTag, U: ClassTag] (
    parent: DStream[T],
    mapFunc: T => U
  ) extends DStream[U](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[U]] = {
    parent.getOrCompute(validTime).map(_.map[U](mapFunc))
  }
}

7. 源码分析:reduceByKey方法

有了看RDD源码的经验,我们很容易找到reduceByKey是在PairDStreamFunctions类中的。下面看一下它的源码。
Return a new DStream by applying reduceByKey to each RDD. The values for each key are merged using the supplied reduce function. org.apache.spark.Partitioner is used to control the partitioning of each RDD.

[AppleScript] 纯文本查看 复制代码
 def reduceByKey(
      reduceFunc: (V, V) => V,
      partitioner: Partitioner): DStream[(K, V)] = ssc.withScope {
    combineByKey((v: V) => v, reduceFunc, reduceFunc, partitioner)
  }


Combine elements of each key in DStream’s RDDs using custom functions. This is similar to the combineByKey for RDDs.
此处,我们仿佛看到了套路,感觉和RDD的设计何其的一致。
这里来了一个ShuffledDStream,具体的Shuffle过程可能会有一点小复杂,暂时不讲,关于shuffle的内容需要再详细地理解一下。


[AppleScript] 纯文本查看 复制代码
 def combineByKey[C: ClassTag](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiner: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true): DStream[(K, C)] = ssc.withScope {
    val cleanedCreateCombiner = sparkContext.clean(createCombiner)
    val cleanedMergeValue = sparkContext.clean(mergeValue)
    val cleanedMergeCombiner = sparkContext.clean(mergeCombiner)
    new ShuffledDStream[K, V, C](
      self,
      cleanedCreateCombiner,
      cleanedMergeValue,
      cleanedMergeCombiner,
      partitioner,
      mapSideCombine)
  }
  


8. 源码分析:DStream.print方法


最后的打印函数也有点意思,它调用的时Dstream的print函数。
firstNum.take(num).foreach(println)这一句,打印出了rdd的内容。

[AppleScript] 纯文本查看 复制代码
 */
  def print(num: Int): Unit = ssc.withScope {
    def foreachFunc: (RDD[T], Time) => Unit = {
      (rdd: RDD[T], time: Time) => {
        val firstNum = rdd.take(num + 1)
        // scalastyle:off println
        println("-------------------------------------------")
        println(s"Time: $time")
        println("-------------------------------------------")
        firstNum.take(num).foreach(println)
        if (firstNum.length > num) println("...")
        println()
        // scalastyle:on println
      }
    }
    foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
  }




然后呢?
我又发现了一个新的Dstream:ForEachDStream。按照注释来讲,上面的print的操作应该生成的时一个ForEachDStream不过,没找到代码。只能暂时搁置。
An internal DStream used to represent output operations like DStream.foreachRDD.


[AppleScript] 纯文本查看 复制代码
class ForEachDStream[T: ClassTag] (
    parent: DStream[T],
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean
  ) extends DStream[Unit](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[Unit]] = None

  override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }
}



0x03 总结
至此,分析完了Dstream的相关源码,这篇和RDD那篇相对来讲都比较基础,主要是对整个流程的梳理,后续会对一些细节的点进行分析。

参考
  • Matei Zaharia’s paper (paper写的真心好)
  • http://spark.apache.org/
  • https://github.com/lw-lin/CoolplaySpark/tree/master/Spark%20Streaming%20%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%E7%B3%BB%E5%88%97

楼主热帖
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享 分享淘帖 赞 踩

168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

GMT+8, 2024-5-19 07:49

Powered by BI168大数据社区

© 2012-2014 168大数据

快速回复 返回顶部 返回列表