最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

Spark Streaming

[复制链接]
跳转到指定楼层
楼主
发表于 2019-8-20 15:11:43 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
概述
Spark Streaming 是 Spark Core API 的扩展,它支持弹性的,高吞吐的,容错的实时数据流的处理。数据可以通过多种数据源获取,例如 Kafka,Flume,Kinesis 以及 TCP sockets,也可以通过例如 map,reduce,join,window等的高级函数组成的复杂算法处理。最终,处理后的数据可以输出到文件系统,数据库以及实时仪表盘中。

在内部,它工作原理如下,Spark Streaming 接收实时输入数据流并将数据切分成多个 batch(批)数据,然后由 Spark 引擎处理它们以生成最终的 stream of results in batches(分批流结果)。

Spark Streaming 提供了一个名为 discretized streamDStream 的高级抽象,它代表一个连续的数据流。DStream 可以从数据源的输入数据流创建,例如 Kafka,Flume 以及 Kinesis,或者在其他 DStream 上进行高层次的操作以创建。在内部,一个 DStream 是通过一系列的 RDDs 来表示。
Note(注意): 在 Python 有些 API 可能会有不同或不可用。
入门示例
在我们详细介绍如何编写你自己的 Spark Streaming 程序的细节之前,让我们先来看一看一个简单的 Spark Streaming 程序的样子。比方说,我们想要计算从一个监听 TCP socket 的数据服务器接收到的文本数据(text data)中的字数。
首先,我们导入了 Spark Streaming 类和部分从 StreamingContext 隐式转换到我们的环境的名称,目的是添加有用的方法到我们需要的其他类(如 DStream)。StreamingContext 是所有流功能的主要入口点。我们创建了一个带有 2 个执行线程和间歇时间为 1 秒的本地 StreamingContext。
import org.apache.spark._import org.apache.spark.streaming._import org.apache.spark.streaming.StreamingContext._
/ 自从 Spark 1.3 开始,不再是必要的了 // 创建一个具有两个工作线程(working thread)并且批次间隔为 1 秒的本地 StreamingContext。// master 需要 2 个核,以防止饥饿情况(core不够用)(starvation scenario)。val conf = new SparkConf().setMaster("local[2]")
          .setAppName("NetworkWordCount")val ssc = new StreamingContext(conf, Seconds(1))

Using this context, we can create a DStream that represents streaming data from a TCP source, specified as hostname (e.g. localhost) and port (e.g. 9999)。使用该 context,我们可以创建一个代表从 TCP 源流数据的离散流(DStream),指定主机名(hostname)(例如 localhost)和端口(例如 9999)。

// 创建一个将要连接到 hostname:port 的 DStream,如 localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
上一步的这个 lines DStream 表示将要从数据服务器接收到的数据流。在这个离散流(DStream)中的每一条记录都是一行文本(text)。接下来,我们想要通过空格字符(space characters)拆分这些数据行(lines)成单词(words)。

// 将每一行拆分成 words(单词)val words = lines.flatMap(_.split(" "))

flatMap 是一种 one-to-many(一对多)的离散流(DStream)操作,它会通过在源离散流(source DStream)中根据每个记录(record)生成多个新纪录的形式创建一个新的离散流(DStream)。在这种情况下,在这种情况下,每一行(each line)都将被拆分成多个单词(words)和代表单词离散流(words DStream)的单词流。接下来,我们想要计算这些单词。

import org.apache.spark.streaming.StreamingContext._
// not necessary since Spark 1.3// 计算每一个 batch(批次)中的每一个 word(单词)val pairs = words.map(word => (word, 1))val wordCounts = pairs.reduceByKey(_ + _)// 在控制台打印出在这个离散流(DStream)中生成的每个 RDD 的前十个元素// 注意:必需要触发 action(很多初学者会忘记触发 action 操作,导致报错:No output operations registered, so nothing to execute)wordCounts.print()

上一步的 words DStream 进行了进一步的映射(一对一的转换)为一个 (word, 1) paris 的离散流(DStream),这个 DStream 然后被规约(reduce)来获得数据中每个批次(batch)的单词频率。最后,wordCounts.print() 将会打印一些每秒生成的计数。
Note that when these lines are executed, Spark Streaming only sets up the computation it will perform when it is started, and no real processing has started yet. To start the processing after all the transformations have been setup, we finally call
请注意,当这些行(lines)被执行的时候,Spark Streaming 仅仅设置了计算,只有在启动时才会执行,并没有开始真正地处理。为了在所有的转换都已经设置好之后开始处理,我们在最后调用:
ssc.start()             // 开始计算
ssc.awaitTermination()  // 等待计算被中断
如果你已经 下载 并且 构建 Spark,您可以使用如下方式来运行该示例。你首先需要运行 Netcat(一个在大多数类 Unix 系统中的小工具)作为我们使用的数据服务器。
$ nc -lk 9999
然后,在运行在 netcat 服务器上的终端输入的任何行(lines),都将被计算,并且每一秒都显示在屏幕上,它看起来就像下面这样:
计算的结果都是以batch为单位的。

基础概念依赖
与 Spark 类似,Spark Streaming 可以通过 Maven 来管理依赖。为了编写你自己的 Spark Streaming 程序,你必须添加以下的依赖到你的 SBT 或者 Maven 项目中。
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.2.0</version>
</dependency>
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.0"
针对从 Spark Streaming Core API 中不存在的数据源中获取数据,如 Kafka,Flume,Kinesis,你必须添加相应的坐标 spark-streaming-xyz_2.11 到依赖中。例如,有一些常见的依赖如下。
初始化 StreamingContext
为了初始化一个 Spark Streaming 程序,一个 StreamingContext 对象必须要被创建出来,它是所有的 Spark Streaming 功能的主入口点。
一个 StreamingContext 对象可以从一个 SparkConf 对象中来创建。
import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
这个 appName 参数是展示在集群 UI 界面上的应用程序的名称。master 是一个 Spark, Mesos or YARN cluster URL,或者一个特殊的 “local
  • ” 字符串以使用 local mode(本地模式)来运行。在实践中,当在集群上运行时,你不会想在应用程序中硬编码 master,而是 使用 spark-submit 来启动应用程序,并且接受该参数。然而,对于本地测试和单元测试,你可以传递 “local
  • ” 来运行 Spark Streaming 进程(检测本地系统中内核的个数)。请注意,做个内部创建了一个 SparkContext(所有 Spark 功能的出发点),它可以像 ssc.sparkContext 这样被访问。

  • notes:在定义一个 context 之后,您必须执行以下操作。
    • 通过创建输入 DStreams 来定义输入源。
    • 通过应用转换和输出操作 DStreams 定义流计算(streaming computations)。
    • 开始接收输入并且使用 streamingContext.start() 来处理数据。
    • 使用 streamingContext.awaitTermination() 等待处理被终止(手动或者由于任何错误)。
    • 使用 streamingContext.stop() 来手动的停止处理。

    需要记住的几点:
    • 一旦一个 context 已经启动,将不会有新的数据流的计算可以被创建或者添加到它。
    • 一旦一个 context 已经停止,它不会被重新启动。
    • 同一时间内在 JVM 中只有一个 StreamingContext 可以被激活。
    • 在 StreamingContext 上的 stop() 同样也停止了 SparkContext。为了只停止 StreamingContext,设置 stop() 的可选参数,名叫 stopSparkContext 为 false。
    • 一个 SparkContext 就可以被重用以创建多个 StreamingContexts,只要前一个 StreamingContext 在下一个StreamingContext 被创建之前停止(不停止 SparkContext)。


    Discretized Streams (DStreams)(离散化流)
    Discretized Stream or DStream 是 Spark Streaming 提供的基本抽象。它代表了一个连续的数据流,无论是从 source(数据源)接收到的输入数据流,还是通过转换输入流所产生的处理过的数据流。在内部,一个 DStream 被表示为一系列连续的 RDDs,它是 Spark 中一个不可改变的抽象,在一个 DStream 中的每个 RDD 包含来自一定的时间间隔的数据,如下图所示。
    应用于 DStream 的任何操作转化为对于底层的 RDDs 的操作。例如,在 先前的示例,转换一个行(lines)流成为单词(words)中,flatMap 操作被应用于在行离散流(lines DStream)中的每个 RDD 来生成单词离散流(words DStream)的 RDDs。如下所示。
    这些底层的 RDD 变换由 Spark 引擎(engine)计算。DStream 操作隐藏了大多数这些细节并为了方便起见,提供给了开发者一个更高级别的 API。

    Input DStreams 和 Receivers(接收器)
    输入 DStreams 是代表输入数据是从流的源数据(streaming sources)接收到的流的 DStream。在 一个入门示例 中,lines 是一个 input DStream,因为它代表着从 netcat 服务器接收到的数据的流。每一个 input DStream(除了 file stream 之外,会在本章的后面来讨论)与一个 Receiver (Scala doc,Java doc) 对象关联,它从 source(数据源)中获取数据,并且存储它到 Sparl 的内存中用于处理。
    Spark Streaming 提供了两种内置的 streaming source(流的数据源)。
    • Basic sources(基础的数据源):在 StreamingContext API 中直接可以使用的数据源。例如:file systems 和 socket connections。
    • Advanced sources(高级的数据源):像 Kafka,Flume,Kinesis,等等这样的数据源。可以通过额外的 utility classes 来使用。像在 依赖 中讨论的一样,这些都需要额外的外部依赖。

    在本节的后边,我们将讨论每种类别中的现有的一些数据源。
    请注意,如果你想要在你的流处理程序中并行的接收多个数据流,你可以创建多个 input DStreams(在 性能优化 部分进一步讨论)。这将创建同时接收多个数据流的多个 receivers(接收器)。但需要注意,一个 Spark 的 worker/executor 是一个长期运行的任务(task),因此它将占用分配给 Spark Streaming 的应用程序的所有核中的一个核(core)。因此,要记住,一个 Spark Streaming 应用需要分配足够的核(core)(或线程(threads),如果本地运行的话)来处理所接收的数据,以及来运行接收器(receiver(s))。
    要记住的几点
    • 当在本地运行一个 Spark Streaming 程序的时候,不要使用 “local” 或者 “local[1]” 作为 master 的 URL。这两种方法中的任何一个都意味着只有一个线程将用于运行本地任务。如果你正在使用一个基于接收器(receiver)的输入离散流(input DStream)(例如,sockets,Kafka,Flume 等),则该单独的线程将用于运行接收器(receiver),而没有留下任何的线程用于处理接收到的数据。因此,在本地运行时,总是用 “local[n]” 作为 master URL,其中的 n > 运行接收器的数量(查看 Spark 属性 来了解怎样去设置 master 的信息)。
    • 将逻辑扩展到集群上去运行,分配给 Spark Streaming 应用程序的内核(core)的内核数必须大于接收器(receiver)的数量。否则系统将接收数据,但是无法处理它。

    基础的 Sources(数据源)
    我们已经简单地了解过了在 入门示例 中 ssc.socketTextStream(...) 的例子,例子中是通过从一个 TCP socket 连接接收到的文本数据来创建了一个离散流(DStream)。除了 sockets 之外,StreamingContext API 也提供了根据文件作为输入来源创建离散流(DStreams)的方法。
    • File Streams: 用于从文件中读取数据,在任何与 HDFS API 兼容的文件系统中(即,HDFS,S3,NFS 等),一个 DStream 可以像下面这样创建:
      streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory) streamingContext.fileStream&lt;KeyClass, ValueClass, InputFormatClass&gt;(dataDirectory); streamingContext.textFileStream(dataDirectory)
      Spark Streaming 将监控dataDirectory 目录并且该目录中任何新建的文件 (写在嵌套目录中的文件是不支持的)。注意
      对于简单的文本文件,还有一个更加简单的方法 streamingContext.textFileStream(dataDirectory)。并且文件流(file streams)不需要运行一个接收器(receiver),因此,不需要分配内核(core)。
      Python API 在 Python API 中 fileStream 是不可用的,只有 textFileStream 是可用的。

      • 文件必须具有相同的数据格式。
      • 文件必须被创建在 dataDirectory 目录中,通过 atomically(原子的)moving(移动)renaming(重命名) 它们到数据目录。
      • 一旦移动,这些文件必须不能再更改,因此如果文件被连续地追加,新的数据将不会被读取。

    • Streams based on Custom Receivers(基于自定义的接收器的流): DStreams 可以使用通过自定义的 receiver(接收器)接收到的数据来创建。更多细节请参阅 自定义 Receiver 指南。
    • Queue of RDDs as a Stream(RDDs 队列作为一个流): 为了使用测试数据测试 Spark Streaming 应用程序,还可以使用 streamingContext.queueStream(queueOfRDDs) 创建一个基于 RDDs 队列的 DStream,每个进入队列的 RDD 都将被视为 DStream 中的一个批次数据,并且就像一个流进行处理。

    想要了解更多的关于从 sockets 和文件(files)创建的流的细节,请参阅相关函数的 API文档,它们在 StreamingContext for Scala,JavaStreamingContext for Java 以及 StreamingContext for Python 中。
    高级 Sources(数据源)
    Python API 从 Spark 2.2.0 开始,在 Python API 中的 Kafka,Kinesis 和 Flume 这样的外部数据源都是可用的。
    这一类别的 sources(数据源)需要使用非 Spark 库中的外部接口,它们中的其中一些还需要比较复杂的依赖关系(例如,Kafka 和 Flume)。因此,为了最小化有关的依赖关系的版本冲突的问题,这些资源本身不能创建 DStream 的功能,它是通过 依赖 单独的类库实现创建 DStream 的功能。
    请注意,这些高级 sources(数据源)不能再 Spark shell 中使用,因此,基于这些高级 sources(数据源)的应用程序不能在 shell 中被测试。如果你真的想要在 Spark shell 中使用它们,你必须下载带有它的依赖的相应的 Maven 组件的 JAR,并且将其添加到 classpath。
    一些高级的 sources(数据源)如下。
    • Kafka: Spark Streaming 2.2.0 与 Kafka broker 版本 0.8.2.1 或更高是兼容的。更多细节请参阅 Kafka 集成指南。
    • Flume: Spark Streaming 2.2.0 与 Flume 1.6.0 相兼容。更多细节请参阅 Flume 集成指南。
    • Kinesis: Spark Streaming 2.2.0 与 Kinesis Client Library 1.2.1 相兼容。更多细节请参阅 Kinesis 集成指南。

    自定义 Sources(数据源)
    Python API 在 Python 中还不支持这一功能。
    Input DStreams 也可以从自定义数据源中创建。如果您想这样做,需要实现一个用户自定义的 receiver(看下一节以了解它是什么),它可以从自定义的 sources(数据源)中接收数据并且推送它到 Spark。更多细节请参阅 自定义 Receiver 指南。
    Receiver Reliability(接收器的可靠性)
    可以有两种基于他们的 reliability可靠性 的数据源。数据源(如 Kafka 和 Flume)允许传输的数据被确认。如果系统从这些可靠的数据来源接收数据,并且被确认(acknowledges)正确地接收数据,它可以确保数据不会因为任何类型的失败而导致数据丢失。这样就出现了 2 种接收器(receivers):
    • Reliable Receiver(可靠的接收器) - 当数据被接收并存储在 Spark 中并带有备份副本时,一个可靠的接收器(reliable receiver)正确地发送确认(acknowledgment)给一个可靠的数据源(reliable source)。
    • Unreliable Receiver(不可靠的接收器) - 一个不可靠的接收器(unreliable receiver)不发送确认(acknowledgment)到数据源。这可以用于不支持确认的数据源,或者甚至是可靠的数据源当你不想或者不需要进行复杂的确认的时候。

    在 自定义 Receiver 指南 中描述了关于如何去编写一个 reliable receiver(可靠的接收器)的细节。

    DStreams 上的 Transformations(转换)
    与 RDD 类似,transformation 允许从 input DStream 输入的数据做修改。DStreams 支持很多在 RDD 中可用的 transformation 算子。一些常用的如下所示 :
    与RDD类似,类似,transformation 允许修改来自 input DStream 的数据。DStreams 支持标准的 Spark RDD 上可用的许多转换。一些常见的如下。

    UpdateStateByKey 操作
    该 updateStateByKey 操作允许您维护任意状态,同时不断更新新信息。你需要通过两步来使用它。
    • 定义 state - state 可以是任何的数据类型。
    • 定义 state update function(状态更新函数)- 使用函数指定如何使用先前状态来更新状态,并从输入流中指定新值。

    在每个 batch 中,Spark 会使用状态更新函数为所有已有的 key 更新状态,不管在 batch 中是否含有新的数据。如果这个更新函数返回一个 none,这个 key-value pair 也会被消除。
    让我们举个例子来说明。在例子中,假设你想保持在文本数据流中看到的每个单词的运行计数,运行次数用一个 state 表示,它的类型是整数,我们可以使用如下方式来定义 update 函数:
    def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
        val newCount = ...  // add the new values with the previous running count to get the new count
        Some(newCount)
    }
    这里是一个应用于包含 words(单词)的 DStream 上(也就是说,在 先前的示例中,该 pairs DStream 包含了 (word, 1) pair)。
    val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
    update 函数将会被每个单词调用,newValues 拥有一系列的 1(来自 (word, 1) pairs),runningCount 拥有之前的次数。
    Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
      (values, state) -> {
        Integer newSum = ...  // add the new values with the previous running count to get the new count
        return Optional.of(newSum);
      };
    请注意,使用 updateStateByKey 需要配置的 checkpoint(检查点)的目录,这里是更详细关于讨论 checkpointing 的部分。

    Transform Operation(转换操作)
    transform 操作(以及它的变化形式如 transformWith)允许在 DStream 运行任何 RDD-to-RDD 函数。它能够被用来应用任何没在 DStream API 中提供的 RDD 操作。例如,连接数据流中的每个批(batch)和另外一个数据集的功能并没有在 DStream API 中提供,然而你可以简单的利用 transform 方法做到。这使得有非常强大的可能性。例如,可以通过将输入数据流与预先计算的垃圾邮件信息(也可以使用 Spark 一起生成)进行实时数据清理,然后根据它进行过滤。
    val spamInfoRDD = ssc.sparkContext.newAPIhadoopRDD(...) // RDD containing spam information

    val cleanedDStream = wordCounts.transform { rdd =>
      rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
      ...
    }
    请注意,每个 batch interval(批间隔)提供的函数被调用。这允许你做随时间变动的 RDD 操作,即 RDD 操作,分区的数量,广播变量,等等。batch 之间等可以改变。
    Window Operations(窗口操作)
    Spark Streaming 也支持 windowed computations(窗口计算),它允许你在数据的一个滑动窗口上应用 transformation(转换)。下图说明了这个滑动窗口。


    如上图显示,窗口在源 DStream 上 slides(滑动),合并和操作落入窗内的源 RDDs,产生窗口化的 DStream 的 RDDs。在这个具体的例子中,程序在三个时间单元的数据上进行窗口操作,并且每两个时间单元滑动一次。这说明,任何一个窗口操作都需要指定两个参数。
    • window length(窗口长度) - 窗口的持续时间(图 3)。
    • sliding interval(滑动间隔) - 执行窗口操作的间隔(图 2)。

    这两个参数必须是 source DStream 的 batch interval(批间隔)的倍数(图 1)。
    让我们举例以说明窗口操作。例如,你想扩展前面的例子用来计算过去 30 秒的词频,间隔时间是 10 秒。为了达到这个目的,我们必须在过去 30 秒的 (wrod, 1) pairs 的 pairs DStream 上应用 reduceByKey 操作。用方法 reduceByKeyAndWindow 实现。
    // Reduce last 30 seconds of data, every 10 seconds
    val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30),
    一些常用的窗口操作如下所示,这些操作都需要用到上文提到的两个参数 - windowLength(窗口长度)slideInterval(滑动的时间间隔)

    Join 操作
    最后,它值得强调的是,您可以轻松地在 Spark Streaming 中执行不同类型的 join。
    Stream-stream joins
    Streams(流)可以非常容易地与其他流进行 join。
    val stream1: DStream[String, String] = ...
    val stream2: DStream[String, String] = ...
    val joinedStream = stream1.join(stream2)
    这里,在每个 batch interval(批间隔)中,由 stream1 生成的 RDD 将与 stream2 生成的 RDD 进行 jion。你也可以做 leftOuterJoin,rightOuterJoin,fullOuterJoin。此外,在 stream(流)的窗口上进行 join 通常是非常有用的。这也很容易做到。
    val windowedStream1 = stream1.window(Seconds(20))
    val windowedStream2 = stream2.window(Minutes(1))
    val joinedStream = windowedStream1.join(windowedStream2)
    Stream-dataset joins
    这在解释 DStream.transform 操作时已经在前面演示过了。这是另一个 join window stream(窗口流)与 dataset 的例子。
    val dataset: RDD[String, String] = ...
    val windowedStream = stream.window(Seconds(20))...
    val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
    实际上,您也可以动态更改要加入的 dataset。提供给 transform 的函数是每个 batch interval(批次间隔)进行评估,因此将使用 dataset 引用指向当前的 dataset。

    DStreams 上的输出操作
    输出操作允许将 DStream 的数据推送到外部系统,如数据库或文件系统。由于输出操作实际上允许外部系统使用变换后的数据,所以它们触发所有 DStream 变换的实际执行(类似于RDD的动作)。目前,定义了以下输出操作:

    foreachRDD 设计模式的使用
    dstream.foreachRDD 是一个强大的原语,允许将数据发送到外部系统。但是,了解如何正确有效地使用这个原语很重要。避免一些常见的错误如下。
    通常向外部系统写入数据需要创建连接对象(例如与远程服务器的 TCP 连接),并使用它将数据发送到远程系统。为此,开发人员可能会无意中尝试在Spark driver 中创建连接对象,然后尝试在Spark工作人员中使用它来在RDD中保存记录。例如(在 Scala 中):
    dstream.foreachRDD { rdd =>
      val connection = createNewConnection()  // executed at the driver
      rdd.foreach { record =>
        connection.send(record) // executed at the worker
      }
    }
    这是不正确的,因为这需要将连接对象序列化并从 driver 发送到 worker。这种连接对象很少能跨机器转移。此错误可能会显示为序列化错误(连接对象不可序列化),初始化错误(连接对象需要在 worker 初始化)等。正确的解决方案是在 worker 创建连接对象。
    但是,这可能会导致另一个常见的错误 - 为每个记录创建一个新的连接。例如:
    dstream.foreachRDD { rdd =>
      rdd.foreach { record =>
        val connection = createNewConnection()
        connection.send(record)
        connection.close()
      }
    }
    通常,创建连接对象具有时间和资源开销。因此,创建和销毁每个记录的连接对象可能会引起不必要的高开销,并可显着降低系统的总体吞吐量。一个更好的解决方案是使用 rdd.foreachPartition - 创建一个连接对象,并使用该连接在 RDD 分区中发送所有记录。
    dstream.foreachRDD { rdd =>  rdd.foreachPartition { partitionOfRecords =>    val connection = createNewConnection()    partitionOfRecords.foreach(record => connection.send(record))    connection.close()  }}

    最后,可以通过跨多个RDD /批次重用连接对象来进一步优化。可以维护连接对象的静态池,而不是将多个批次的 RDD 推送到外部系统时重新使用,从而进一步减少开销。
    dstream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        // ConnectionPool is a static, lazily initialized pool of connections
        val connection = ConnectionPool.getConnection()
        partitionOfRecords.foreach(record => connection.send(record))
        ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
      }
    }
    请注意,池中的连接应根据需要懒惰创建,如果不使用一段时间,则会超时。这实现了最有效地将数据发送到外部系统.
    其他要记住的要点:
    • DStreams 通过输出操作进行延迟执行,就像 RDD 由 RDD 操作懒惰地执行。具体来说,DStream 输出操作中的 RDD 动作强制处理接收到的数据。因此,如果您的应用程序没有任何输出操作,或者具有 dstream.foreachRDD()等输出操作,而在其中没有任何 RDD 操作,则不会执行任何操作。系统将简单地接收数据并将其丢弃。
    • 默认情况下,输出操作是 one-at-a-time 执行的。它们按照它们在应用程序中定义的顺序执行。



    DataFrame 和 SQL 操作
    您可以轻松地在流数据上使用 DataFrames and SQL 和 SQL 操作。您必须使用 StreamingContext 正在使用的 SparkContext 创建一个 SparkSession。此外,必须这样做,以便可以在 driver 故障时重新启动。这是通过创建一个简单实例化的 SparkSession 单例实例来实现的。这在下面的示例中显示。它使用 DataFrames 和 SQL 来修改早期的字数 示例以生成单词计数。将每个 RDD 转换为 DataFrame,注册为临时表,然后使用 SQL 进行查询。
    /** DataFrame operations inside your streaming program */

    val words: DStream[String] = ...

    words.foreachRDD { rdd =>

      // Get the singleton instance of SparkSession
      val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
      import spark.implicits._

      // Convert RDD[String] to DataFrame
      val wordsDataFrame = rdd.toDF("word")

      // Create a temporary view
      wordsDataFrame.createOrReplaceTempView("words")

      // Do word count on DataFrame using SQL and print it
      val wordCountsDataFrame =
        spark.sql("select word, count(*) as total from words group by word")
      wordCountsDataFrame.show()
    }
    Checkpointing
    streaming 应用程序必须 24/7 运行,因此必须对应用逻辑无关的故障(例如,系统故障,JVM 崩溃等)具有弹性。为了可以这样做,Spark Streaming 需要 checkpoint 足够的信息到容错存储系统,以便可以从故障中恢复。checkpoint 有两种类型的数据。
    • Metadata checkpointing - 将定义 streaming 计算的信息保存到容错存储(如 HDFS)中。这用于从运行 streaming 应用程序的 driver 的节点的故障中恢复(稍后详细讨论)。元数据包括:

      • Configuration - 用于创建流应用程序的配置。
      • DStream operations - 定义 streaming 应用程序的 DStream 操作集。
      • Incomplete batches - 批量的job 排队但尚未完成。

    • Data checkpointing - 将生成的 RDD 保存到可靠的存储。这在一些将多个批次之间的数据进行组合的 状态 变换中是必需的。在这种转换中,生成的 RDD 依赖于先前批次的 RDD,这导致依赖链的长度随时间而增加。为了避免恢复时间的这种无限增加(与依赖关系链成比例),有状态转换的中间 RDD 会定期 checkpoint 到可靠的存储(例如 HDFS)以切断依赖关系链。

    总而言之,元数据 checkpoint 主要用于从 driver 故障中恢复,而数据或 RDD checkpoint 对于基本功能(如果使用有状态转换)则是必需的。
    何时启用 checkpoint
    对于具有以下任一要求的应用程序,必须启用 checkpoint:
    • 使用状态转换 - 如果在应用程序中使用 updateStateByKey或 reduceByKeyAndWindow(具有反向功能),则必须提供 checkpoint 目录以允许定期的 RDD checkpoint。
    • 从运行应用程序的 driver 的故障中恢复 - 元数据 checkpoint 用于使用进度信息进行恢复。

    请注意,无需进行上述有状态转换的简单 streaming 应用程序即可运行,无需启用 checkpoint。在这种情况下,驱动器故障的恢复也将是部分的(一些接收但未处理的数据可能会丢失)。这通常是可以接受的,许多运行 Spark Streaming 应用程序。未来对非 Hadoop 环境的支持预计会有所改善。
    如何配置 checkpoint
    可以通过在保存 checkpoint 信息的容错,可靠的文件系统(例如,HDFS,S3等)中设置目录来启用 checkpoint。这是通过使用 streamingContext.checkpoint(checkpointDirectory) 完成的。这将允许您使用上述有状态转换。另外,如果要使应用程序从 driver 故障中恢复,您应该重写 streaming 应用程序以具有以下行为。
    • 当程序第一次启动时,它将创建一个新的 StreamingContext,设置所有流,然后调用 start()。
    • 当程序在失败后重新启动时,它将从 checkpoint 目录中的 checkpoint 数据重新创建一个 StreamingContext。

    使用 StreamingContext.getOrCreate 可以简化此行为。这样使用如下。
    // Function to create and setup a new StreamingContext
    def functionToCreateContext(): StreamingContext = {
      val ssc = new StreamingContext(...)   // new context
      val lines = ssc.socketTextStream(...) // create DStreams
      ...
      ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
      ssc
    }

    // Get StreamingContext from checkpoint data or create a new one
    val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

    // Do additional setup on context that needs to be done,
    // irrespective of whether it is being started or restarted
    context. ...

    // Start the context
    context.start()
    context.awaitTermination()
    除了使用 getOrCreate 之外,还需要确保在失败时自动重新启动 driver 进程。这只能由用于运行应用程序的部署基础架构完成。这在 部署 部分进一步讨论。
    请注意,RDD 的 checkpoint 会导致保存到可靠存储的成本。这可能会导致 RDD 得到 checkpoint 的批次的处理时间增加。因此,需要仔细设置 checkpoint 的间隔。在小批量大小(例如:1秒),检查每个批次可能会显着降低操作吞吐量。相反,checkpoint 太少会导致谱系和任务大小增长,这可能会产生不利影响。对于需要 RDD checkpoint 的状态转换,默认间隔是至少10秒的批间隔的倍数。它可以通过使用 dstream.checkpoint(checkpointInterval) 进行设置。通常,DStream 的5到10个滑动间隔的 checkpoint 间隔是一个很好的设置。
    Accumulators,Broadcast 变量,和 Checkpoint
    在Spark Streaming中,无法从 checkpoint 恢复 Accumulators 和 Broadcast 变量。如果启用 checkpoint 并使用 Accumulators 或 Broadcast 变量,则必须为 Accumulators 和 Broadcast 变量 创建延迟实例化的单例实例,以便在 driver 重新启动失败后重新实例化。这在下面的示例中显示:
    object WordBlacklist {

      @volatile private var instance: Broadcast[Seq[String]] = null

      def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
        if (instance == null) {
          synchronized {
            if (instance == null) {
              val wordBlacklist = Seq("a", "b", "c")
              instance = sc.broadcast(wordBlacklist)
            }
          }
        }
        instance
      }
    }

    object DroppedWordsCounter {

      @volatile private var instance: LongAccumulator = null

      def getInstance(sc: SparkContext): LongAccumulator = {
        if (instance == null) {
          synchronized {
            if (instance == null) {
              instance = sc.longAccumulator("WordsInBlacklistCounter")
            }
          }
        }
        instance
      }
    }

    wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
      // Get or register the blacklist Broadcast
      val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
      // Get or register the droppedWordsCounter Accumulator
      val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
      // Use blacklist to drop words and use droppedWordsCounter to count them
      val counts = rdd.filter { case (word, count) =>
        if (blacklist.value.contains(word)) {
          droppedWordsCounter.add(count)
          false
        } else {
          true
        }
      }.collect().mkString("[", ", ", "]")
      val output = "Counts at time " + time + " " + counts
    })
    应用程序部署
    本节讨论部署 Spark Streaming 应用程序的步骤。
    要求
    要运行 Spark Streaming 应用程序,您需要具备以下功能。
    • 集群管理器集群 - 这是任何 Spark 应用程序的一般要求,并在 部署指南 中详细讨论。
    • 打包应用程序 JAR - 您必须将 streaming 应用程序编译为 JAR。如果您正在使用 spark-submit 启动应用程序,则不需要在 JAR 中提供 Spark 和 Spark Streaming。但是,如果您的应用程序使用高级资源(例如:Kafka,Flume),那么您将必须将他们链接的额外工件及其依赖项打包在用于部署应用程序的 JAR 中。例如,使用 KafkaUtils 的应用程序必须在应用程序 JAR 中包含 spark-streaming-kafka-0-8_2.11 及其所有传递依赖关系。
    • 为 executor 配置足够的内存 - 由于接收到的数据必须存储在内存中,所以 executor 必须配置足够的内存来保存接收到的数据。请注意,如果您正在进行10分钟的窗口操作,系统必须至少保留最近10分钟的内存中的数据。因此,应用程序的内存要求取决于其中使用的操作。
    • 配置 checkpoint - 如果 streaming 应用程序需要它,则 Hadoop API 兼容容错存储(例如:HDFS,S3等)中的目录必须配置为 checkpoint 目录,并且流程应用程序以 checkpoint 信息的方式编写 用于故障恢复。有关详细信息,请参阅 checkpoint 部分。
    • 配置应用程序 driver 的自动重新启动 - 要从 driver 故障自动恢复,用于运行流应用程序的部署基础架构必须监视 driver 进程,并在 driver 发生故障时重新启动 driver。不同的 集群管理者 有不同的工具来实现这一点。

      • Spark Standalone - 可以提交 Spark 应用程序 driver 以在Spark Standalone集群中运行(请参阅 集群部署模式),即应用程序 driver 本身在其中一个工作节点上运行。此外,可以指示独立的群集管理器来监督 driver,如果由于非零退出代码而导致 driver 发生故障,或由于运行 driver 的节点发生故障,则可以重新启动它。有关详细信息,请参阅 [Spark Standalone 指南]](spark-standalone.html) 中的群集模式和监督。
      • YARN - Yarn 支持类似的机制来自动重新启动应用程序。有关详细信息,请参阅 YARN文档。
      • Mesos - Marathon 已被用来实现这一点与Mesos。

    • 配置预写日志 - 自 Spark 1.2 以来,我们引入了写入日志来实现强大的容错保证。如果启用,则从 receiver 接收的所有数据都将写入配置 checkpoint 目录中的写入日志。这可以防止 driver 恢复时的数据丢失,从而确保零数据丢失(在 容错语义 部分中详细讨论)。可以通过将 配置参数 spark.streaming.receiver.writeAheadLog.enable 设置为 true来启用此功能。然而,这些更强的语义可能以单个 receiver 的接收吞吐量为代价。通过 并行运行更多的 receiver 可以纠正这一点,以增加总吞吐量。另外,建议在启用写入日志时,在日志已经存储在复制的存储系统中时,禁用在 Spark 中接收到的数据的复制。这可以通过将输入流的存储级别设置为 StorageLevel.MEMORY_AND_DISK_SER 来完成。使用 S3(或任何不支持刷新的文件系统)写入日志时,请记住启用 spark.streaming.driver.writeAheadLog.closeFileAfterWrite 和spark.streaming.receiver.writeAheadLog.closeFileAfterWrite。有关详细信息,请参阅 Spark Streaming配。请注意,启用 I/O 加密时,Spark 不会将写入写入日志的数据加密。如果需要对提前记录数据进行加密,则应将其存储在本地支持加密的文件系统中。
    • 设置最大接收速率 - 如果集群资源不够大,streaming 应用程序能够像接收到的那样快速处理数据,则可以通过设置 记录/秒 的最大速率限制来对 receiver 进行速率限制。请参阅 receiver 的 spark.streaming.receiver.maxRate 和用于 Direct Kafka 方法的 spark.streaming.kafka.maxRatePerPartition 的 配置参数。在Spark 1.5中,我们引入了一个称为背压的功能,无需设置此速率限制,因为Spark Streaming会自动计算速率限制,并在处理条件发生变化时动态调整速率限制。可以通过将 配置参数 spark.streaming.backpressure.enabled 设置为 true 来启用此 backpressure。

    升级应用程序代码
    如果运行的 Spark Streaming 应用程序需要使用新的应用程序代码进行升级,则有两种可能的机制。
    • 升级后的 Spark Streaming 应用程序与现有应用程序并行启动并运行。一旦新的(接收与旧的数据相同的数据)已经升温并准备好黄金时段,旧的可以被关掉。请注意,这可以用于支持将数据发送到两个目的地(即较早和已升级的应用程序)的数据源。
    • 现有应用程序正常关闭(请参阅 StreamingContext.stop(...) 或 JavaStreamingContext.stop(...) 以获取正常的关闭选项),以确保已关闭的数据在关闭之前被完全处理。然后可以启动升级的应用程序,这将从较早的应用程序停止的同一点开始处理。请注意,只有在支持源端缓冲的输入源(如:Kafka 和 Flume)时才可以进行此操作,因为数据需要在先前的应用程序关闭并且升级的应用程序尚未启动时进行缓冲。从升级前代码的早期 checkpoint 信息重新启动不能完成。checkpoint 信息基本上包含序列化的 Scala/Java/Python 对象,并尝试使用新的修改的类反序列化对象可能会导致错误。在这种情况下,可以使用不同的 checkpoint 目录启动升级的应用程序,也可以删除以前的 checkpoint 目录。


    Monitoring Applications(监控应用程序)
    除了 Spark 的 monitoring capabilities(监控功能),还有其他功能特定于 Spark Streaming。当使用 StreamingContext 时,Spark web UI 显示一个额外的 Streaming 选项卡,显示 running receivers(运行接收器)的统计信息(无论是 receivers(接收器)是否处于 active(活动状态),接收到的 records(记录)数,receiver error(接收器错误)等)并完成 batches(批次)(batch processing times(批处理时间),queueing delays(排队延迟)等)。这可以用来监视 streaming application(流应用程序)的进度。
    web UI 中的以下两个 metrics(指标)特别重要:
    • Processing Time(处理时间) - 处理每 batch(批)数据的时间。
    • Scheduling Delay(调度延迟) - batch(批处理)在 queue(队列)中等待处理 previous batches(以前批次)完成的时间。

    如果 batch processing time(批处理时间)始终 more than(超过)batch interval(批间隔)and/or queueing delay(排队延迟)不断增加,表示系统是无法快速 process the batches(处理批次),并且正在 falling behind(落后)。在这种情况下,请考虑 reducing(减少) batch processing time(批处理时间)。
    Spark Streaming 程序的进展也可以使用 StreamingListener 接口,这允许您获得 receiver status(接收器状态)和 processing times(处理时间)。请注意,这是一个开发人员 API 并且将来可能会改善(即,更多的信息报告)。


    Performance Tuning(性能调优)
    在集群上的 Spark Streaming application 中获得最佳性能需要一些调整。本节介绍了可调整的多个 parameters(参数)和 configurations(配置)提高你的应用程序性能。在高层次上,你需要考虑两件事情:
    • 通过有效利用集群资源,Reducing the processing time of each batch of data(减少每批数据的处理时间)。
    • 设置正确的 batch size(批量大小),以便 batches of data(批量的数据)可以像 received(被接收)处理一样快(即 data processing(数据处理)与 data ingestion(数据摄取)保持一致)。

    Reducing the Batch Processing Times(减少批处理时间)
    在 Spark 中可以进行一些优化,以 minimize the processing time of each batch(最小化每批处理时间)。这些已在 Tuning Guide(调优指南) 中详细讨论过。本节突出了一些最重要的。
    Level of Parallelism in Data Receiving(数据接收中的并行级别)
    通过网络接收数据(如Kafka,Flume,socket 等)需要 deserialized(反序列化)数据并存储在 Spark 中。如果数据接收成为系统的瓶颈,那么考虑一下 parallelizing the data receiving(并行化数据接收)。注意每个 input DStream 创建接收 single stream of data(单个数据流)的 single receiver(单个接收器)(在 work machine 上运行)。因此,可以通过创建多个 input DStreams 来实现 Receiving multiple data streams(接收多个数据流)并配置它们以从 source(s) 接收 data stream(数据流)的 different partitions(不同分区)。例如,接收 two topics of data(两个数据主题)的单个Kafka input DStream 可以分为两个 Kafka input streams(输入流),每个只接收一个 topic(主题)。这将运行两个 receivers(接收器),允许 in parallel(并行)接收数据,从而提高 overall throughput(总体吞吐量)。这些 multiple DStreams 可以 unioned(联合起来)创建一个 single DStream。然后 transformations(转化)为应用于 single input DStream 可以应用于 unified stream。如下这样做。
    val numStreams = 5
    val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
    val unifiedStream = streamingContext.union(kafkaStreams)
    unifiedStream.print()


    楼主热帖
    分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
    收藏收藏 转播转播 分享分享 分享淘帖 赞 踩

    168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
    2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
    3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
    4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
    5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
    6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
    7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    关闭

    站长推荐上一条 /1 下一条

    关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

    GMT+8, 2024-4-25 07:56

    Powered by BI168大数据社区

    © 2012-2014 168大数据

    快速回复 返回顶部 返回列表