168大数据

标题: Spark Streaming [打印本页]

作者: 168主编    时间: 2019-8-20 15:11
标题: Spark Streaming
概述
Spark Streaming 是 Spark Core API 的扩展,它支持弹性的,高吞吐的,容错的实时数据流的处理。数据可以通过多种数据源获取,例如 Kafka,Flume,Kinesis 以及 TCP sockets,也可以通过例如 map,reduce,join,window等的高级函数组成的复杂算法处理。最终,处理后的数据可以输出到文件系统,数据库以及实时仪表盘中。

在内部,它工作原理如下,Spark Streaming 接收实时输入数据流并将数据切分成多个 batch(批)数据,然后由 Spark 引擎处理它们以生成最终的 stream of results in batches(分批流结果)。

Spark Streaming 提供了一个名为 discretized streamDStream 的高级抽象,它代表一个连续的数据流。DStream 可以从数据源的输入数据流创建,例如 Kafka,Flume 以及 Kinesis,或者在其他 DStream 上进行高层次的操作以创建。在内部,一个 DStream 是通过一系列的 RDDs 来表示。
Note(注意): 在 Python 有些 API 可能会有不同或不可用。
入门示例
在我们详细介绍如何编写你自己的 Spark Streaming 程序的细节之前,让我们先来看一看一个简单的 Spark Streaming 程序的样子。比方说,我们想要计算从一个监听 TCP socket 的数据服务器接收到的文本数据(text data)中的字数。
首先,我们导入了 Spark Streaming 类和部分从 StreamingContext 隐式转换到我们的环境的名称,目的是添加有用的方法到我们需要的其他类(如 DStream)。StreamingContext 是所有流功能的主要入口点。我们创建了一个带有 2 个执行线程和间歇时间为 1 秒的本地 StreamingContext。
import org.apache.spark._import org.apache.spark.streaming._import org.apache.spark.streaming.StreamingContext._
/ 自从 Spark 1.3 开始,不再是必要的了 // 创建一个具有两个工作线程(working thread)并且批次间隔为 1 秒的本地 StreamingContext。// master 需要 2 个核,以防止饥饿情况(core不够用)(starvation scenario)。val conf = new SparkConf().setMaster("local[2]")
          .setAppName("NetworkWordCount")val ssc = new StreamingContext(conf, Seconds(1))

Using this context, we can create a DStream that represents streaming data from a TCP source, specified as hostname (e.g. localhost) and port (e.g. 9999)。使用该 context,我们可以创建一个代表从 TCP 源流数据的离散流(DStream),指定主机名(hostname)(例如 localhost)和端口(例如 9999)。

// 创建一个将要连接到 hostname:port 的 DStream,如 localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
上一步的这个 lines DStream 表示将要从数据服务器接收到的数据流。在这个离散流(DStream)中的每一条记录都是一行文本(text)。接下来,我们想要通过空格字符(space characters)拆分这些数据行(lines)成单词(words)。

// 将每一行拆分成 words(单词)val words = lines.flatMap(_.split(" "))

flatMap 是一种 one-to-many(一对多)的离散流(DStream)操作,它会通过在源离散流(source DStream)中根据每个记录(record)生成多个新纪录的形式创建一个新的离散流(DStream)。在这种情况下,在这种情况下,每一行(each line)都将被拆分成多个单词(words)和代表单词离散流(words DStream)的单词流。接下来,我们想要计算这些单词。

import org.apache.spark.streaming.StreamingContext._
// not necessary since Spark 1.3// 计算每一个 batch(批次)中的每一个 word(单词)val pairs = words.map(word => (word, 1))val wordCounts = pairs.reduceByKey(_ + _)// 在控制台打印出在这个离散流(DStream)中生成的每个 RDD 的前十个元素// 注意:必需要触发 action(很多初学者会忘记触发 action 操作,导致报错:No output operations registered, so nothing to execute)wordCounts.print()

上一步的 words DStream 进行了进一步的映射(一对一的转换)为一个 (word, 1) paris 的离散流(DStream),这个 DStream 然后被规约(reduce)来获得数据中每个批次(batch)的单词频率。最后,wordCounts.print() 将会打印一些每秒生成的计数。
Note that when these lines are executed, Spark Streaming only sets up the computation it will perform when it is started, and no real processing has started yet. To start the processing after all the transformations have been setup, we finally call
请注意,当这些行(lines)被执行的时候,Spark Streaming 仅仅设置了计算,只有在启动时才会执行,并没有开始真正地处理。为了在所有的转换都已经设置好之后开始处理,我们在最后调用:
ssc.start()             // 开始计算
ssc.awaitTermination()  // 等待计算被中断
如果你已经 下载 并且 构建 Spark,您可以使用如下方式来运行该示例。你首先需要运行 Netcat(一个在大多数类 Unix 系统中的小工具)作为我们使用的数据服务器。
$ nc -lk 9999
然后,在运行在 netcat 服务器上的终端输入的任何行(lines),都将被计算,并且每一秒都显示在屏幕上,它看起来就像下面这样:
计算的结果都是以batch为单位的。

基础概念依赖
与 Spark 类似,Spark Streaming 可以通过 Maven 来管理依赖。为了编写你自己的 Spark Streaming 程序,你必须添加以下的依赖到你的 SBT 或者 Maven 项目中。
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.2.0</version>
</dependency>
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.0"
针对从 Spark Streaming Core API 中不存在的数据源中获取数据,如 Kafka,Flume,Kinesis,你必须添加相应的坐标 spark-streaming-xyz_2.11 到依赖中。例如,有一些常见的依赖如下。
初始化 StreamingContext
为了初始化一个 Spark Streaming 程序,一个 StreamingContext 对象必须要被创建出来,它是所有的 Spark Streaming 功能的主入口点。
一个 StreamingContext 对象可以从一个 SparkConf 对象中来创建。
import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
这个 appName 参数是展示在集群 UI 界面上的应用程序的名称。master 是一个 Spark, Mesos or YARN cluster URL,或者一个特殊的 “local
  • ” 字符串以使用 local mode(本地模式)来运行。在实践中,当在集群上运行时,你不会想在应用程序中硬编码 master,而是 使用 spark-submit 来启动应用程序,并且接受该参数。然而,对于本地测试和单元测试,你可以传递 “local
  • ” 来运行 Spark Streaming 进程(检测本地系统中内核的个数)。请注意,做个内部创建了一个 SparkContext(所有 Spark 功能的出发点),它可以像 ssc.sparkContext 这样被访问。

  • notes:在定义一个 context 之后,您必须执行以下操作。
    需要记住的几点:

    Discretized Streams (DStreams)(离散化流)
    Discretized Stream or DStream 是 Spark Streaming 提供的基本抽象。它代表了一个连续的数据流,无论是从 source(数据源)接收到的输入数据流,还是通过转换输入流所产生的处理过的数据流。在内部,一个 DStream 被表示为一系列连续的 RDDs,它是 Spark 中一个不可改变的抽象,在一个 DStream 中的每个 RDD 包含来自一定的时间间隔的数据,如下图所示。
    应用于 DStream 的任何操作转化为对于底层的 RDDs 的操作。例如,在 先前的示例,转换一个行(lines)流成为单词(words)中,flatMap 操作被应用于在行离散流(lines DStream)中的每个 RDD 来生成单词离散流(words DStream)的 RDDs。如下所示。
    这些底层的 RDD 变换由 Spark 引擎(engine)计算。DStream 操作隐藏了大多数这些细节并为了方便起见,提供给了开发者一个更高级别的 API。

    Input DStreams 和 Receivers(接收器)
    输入 DStreams 是代表输入数据是从流的源数据(streaming sources)接收到的流的 DStream。在 一个入门示例 中,lines 是一个 input DStream,因为它代表着从 netcat 服务器接收到的数据的流。每一个 input DStream(除了 file stream 之外,会在本章的后面来讨论)与一个 Receiver (Scala doc,Java doc) 对象关联,它从 source(数据源)中获取数据,并且存储它到 Sparl 的内存中用于处理。
    Spark Streaming 提供了两种内置的 streaming source(流的数据源)。
    在本节的后边,我们将讨论每种类别中的现有的一些数据源。
    请注意,如果你想要在你的流处理程序中并行的接收多个数据流,你可以创建多个 input DStreams(在 性能优化 部分进一步讨论)。这将创建同时接收多个数据流的多个 receivers(接收器)。但需要注意,一个 Spark 的 worker/executor 是一个长期运行的任务(task),因此它将占用分配给 Spark Streaming 的应用程序的所有核中的一个核(core)。因此,要记住,一个 Spark Streaming 应用需要分配足够的核(core)(或线程(threads),如果本地运行的话)来处理所接收的数据,以及来运行接收器(receiver(s))。
    要记住的几点基础的 Sources(数据源)
    我们已经简单地了解过了在 入门示例 中 ssc.socketTextStream(...) 的例子,例子中是通过从一个 TCP socket 连接接收到的文本数据来创建了一个离散流(DStream)。除了 sockets 之外,StreamingContext API 也提供了根据文件作为输入来源创建离散流(DStreams)的方法。
    想要了解更多的关于从 sockets 和文件(files)创建的流的细节,请参阅相关函数的 API文档,它们在 StreamingContext for Scala,JavaStreamingContext for Java 以及 StreamingContext for Python 中。
    高级 Sources(数据源)
    Python API 从 Spark 2.2.0 开始,在 Python API 中的 Kafka,Kinesis 和 Flume 这样的外部数据源都是可用的。
    这一类别的 sources(数据源)需要使用非 Spark 库中的外部接口,它们中的其中一些还需要比较复杂的依赖关系(例如,Kafka 和 Flume)。因此,为了最小化有关的依赖关系的版本冲突的问题,这些资源本身不能创建 DStream 的功能,它是通过 依赖 单独的类库实现创建 DStream 的功能。
    请注意,这些高级 sources(数据源)不能再 Spark shell 中使用,因此,基于这些高级 sources(数据源)的应用程序不能在 shell 中被测试。如果你真的想要在 Spark shell 中使用它们,你必须下载带有它的依赖的相应的 Maven 组件的 JAR,并且将其添加到 classpath。
    一些高级的 sources(数据源)如下。
    自定义 Sources(数据源)
    Python API 在 Python 中还不支持这一功能。
    Input DStreams 也可以从自定义数据源中创建。如果您想这样做,需要实现一个用户自定义的 receiver(看下一节以了解它是什么),它可以从自定义的 sources(数据源)中接收数据并且推送它到 Spark。更多细节请参阅 自定义 Receiver 指南。
    Receiver Reliability(接收器的可靠性)
    可以有两种基于他们的 reliability可靠性 的数据源。数据源(如 Kafka 和 Flume)允许传输的数据被确认。如果系统从这些可靠的数据来源接收数据,并且被确认(acknowledges)正确地接收数据,它可以确保数据不会因为任何类型的失败而导致数据丢失。这样就出现了 2 种接收器(receivers):
    在 自定义 Receiver 指南 中描述了关于如何去编写一个 reliable receiver(可靠的接收器)的细节。

    DStreams 上的 Transformations(转换)
    与 RDD 类似,transformation 允许从 input DStream 输入的数据做修改。DStreams 支持很多在 RDD 中可用的 transformation 算子。一些常用的如下所示 :
    与RDD类似,类似,transformation 允许修改来自 input DStream 的数据。DStreams 支持标准的 Spark RDD 上可用的许多转换。一些常见的如下。

    UpdateStateByKey 操作
    该 updateStateByKey 操作允许您维护任意状态,同时不断更新新信息。你需要通过两步来使用它。
    在每个 batch 中,Spark 会使用状态更新函数为所有已有的 key 更新状态,不管在 batch 中是否含有新的数据。如果这个更新函数返回一个 none,这个 key-value pair 也会被消除。
    让我们举个例子来说明。在例子中,假设你想保持在文本数据流中看到的每个单词的运行计数,运行次数用一个 state 表示,它的类型是整数,我们可以使用如下方式来定义 update 函数:
    def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
        val newCount = ...  // add the new values with the previous running count to get the new count
        Some(newCount)
    }
    这里是一个应用于包含 words(单词)的 DStream 上(也就是说,在 先前的示例中,该 pairs DStream 包含了 (word, 1) pair)。
    val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
    update 函数将会被每个单词调用,newValues 拥有一系列的 1(来自 (word, 1) pairs),runningCount 拥有之前的次数。
    Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
      (values, state) -> {
        Integer newSum = ...  // add the new values with the previous running count to get the new count
        return Optional.of(newSum);
      };
    请注意,使用 updateStateByKey 需要配置的 checkpoint(检查点)的目录,这里是更详细关于讨论 checkpointing 的部分。

    Transform Operation(转换操作)
    transform 操作(以及它的变化形式如 transformWith)允许在 DStream 运行任何 RDD-to-RDD 函数。它能够被用来应用任何没在 DStream API 中提供的 RDD 操作。例如,连接数据流中的每个批(batch)和另外一个数据集的功能并没有在 DStream API 中提供,然而你可以简单的利用 transform 方法做到。这使得有非常强大的可能性。例如,可以通过将输入数据流与预先计算的垃圾邮件信息(也可以使用 Spark 一起生成)进行实时数据清理,然后根据它进行过滤。
    val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

    val cleanedDStream = wordCounts.transform { rdd =>
      rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
      ...
    }
    请注意,每个 batch interval(批间隔)提供的函数被调用。这允许你做随时间变动的 RDD 操作,即 RDD 操作,分区的数量,广播变量,等等。batch 之间等可以改变。
    Window Operations(窗口操作)
    Spark Streaming 也支持 windowed computations(窗口计算),它允许你在数据的一个滑动窗口上应用 transformation(转换)。下图说明了这个滑动窗口。


    如上图显示,窗口在源 DStream 上 slides(滑动),合并和操作落入窗内的源 RDDs,产生窗口化的 DStream 的 RDDs。在这个具体的例子中,程序在三个时间单元的数据上进行窗口操作,并且每两个时间单元滑动一次。这说明,任何一个窗口操作都需要指定两个参数。
    这两个参数必须是 source DStream 的 batch interval(批间隔)的倍数(图 1)。
    让我们举例以说明窗口操作。例如,你想扩展前面的例子用来计算过去 30 秒的词频,间隔时间是 10 秒。为了达到这个目的,我们必须在过去 30 秒的 (wrod, 1) pairs 的 pairs DStream 上应用 reduceByKey 操作。用方法 reduceByKeyAndWindow 实现。
    // Reduce last 30 seconds of data, every 10 seconds
    val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30),
    一些常用的窗口操作如下所示,这些操作都需要用到上文提到的两个参数 - windowLength(窗口长度)slideInterval(滑动的时间间隔)

    Join 操作
    最后,它值得强调的是,您可以轻松地在 Spark Streaming 中执行不同类型的 join。
    Stream-stream joins
    Streams(流)可以非常容易地与其他流进行 join。
    val stream1: DStream[String, String] = ...
    val stream2: DStream[String, String] = ...
    val joinedStream = stream1.join(stream2)
    这里,在每个 batch interval(批间隔)中,由 stream1 生成的 RDD 将与 stream2 生成的 RDD 进行 jion。你也可以做 leftOuterJoin,rightOuterJoin,fullOuterJoin。此外,在 stream(流)的窗口上进行 join 通常是非常有用的。这也很容易做到。
    val windowedStream1 = stream1.window(Seconds(20))
    val windowedStream2 = stream2.window(Minutes(1))
    val joinedStream = windowedStream1.join(windowedStream2)
    Stream-dataset joins
    这在解释 DStream.transform 操作时已经在前面演示过了。这是另一个 join window stream(窗口流)与 dataset 的例子。
    val dataset: RDD[String, String] = ...
    val windowedStream = stream.window(Seconds(20))...
    val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
    实际上,您也可以动态更改要加入的 dataset。提供给 transform 的函数是每个 batch interval(批次间隔)进行评估,因此将使用 dataset 引用指向当前的 dataset。

    DStreams 上的输出操作
    输出操作允许将 DStream 的数据推送到外部系统,如数据库或文件系统。由于输出操作实际上允许外部系统使用变换后的数据,所以它们触发所有 DStream 变换的实际执行(类似于RDD的动作)。目前,定义了以下输出操作:

    foreachRDD 设计模式的使用
    dstream.foreachRDD 是一个强大的原语,允许将数据发送到外部系统。但是,了解如何正确有效地使用这个原语很重要。避免一些常见的错误如下。
    通常向外部系统写入数据需要创建连接对象(例如与远程服务器的 TCP 连接),并使用它将数据发送到远程系统。为此,开发人员可能会无意中尝试在Spark driver 中创建连接对象,然后尝试在Spark工作人员中使用它来在RDD中保存记录。例如(在 Scala 中):
    dstream.foreachRDD { rdd =>
      val connection = createNewConnection()  // executed at the driver
      rdd.foreach { record =>
        connection.send(record) // executed at the worker
      }
    }
    这是不正确的,因为这需要将连接对象序列化并从 driver 发送到 worker。这种连接对象很少能跨机器转移。此错误可能会显示为序列化错误(连接对象不可序列化),初始化错误(连接对象需要在 worker 初始化)等。正确的解决方案是在 worker 创建连接对象。
    但是,这可能会导致另一个常见的错误 - 为每个记录创建一个新的连接。例如:
    dstream.foreachRDD { rdd =>
      rdd.foreach { record =>
        val connection = createNewConnection()
        connection.send(record)
        connection.close()
      }
    }
    通常,创建连接对象具有时间和资源开销。因此,创建和销毁每个记录的连接对象可能会引起不必要的高开销,并可显着降低系统的总体吞吐量。一个更好的解决方案是使用 rdd.foreachPartition - 创建一个连接对象,并使用该连接在 RDD 分区中发送所有记录。
    dstream.foreachRDD { rdd =>  rdd.foreachPartition { partitionOfRecords =>    val connection = createNewConnection()    partitionOfRecords.foreach(record => connection.send(record))    connection.close()  }}

    最后,可以通过跨多个RDD /批次重用连接对象来进一步优化。可以维护连接对象的静态池,而不是将多个批次的 RDD 推送到外部系统时重新使用,从而进一步减少开销。
    dstream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        // ConnectionPool is a static, lazily initialized pool of connections
        val connection = ConnectionPool.getConnection()
        partitionOfRecords.foreach(record => connection.send(record))
        ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
      }
    }
    请注意,池中的连接应根据需要懒惰创建,如果不使用一段时间,则会超时。这实现了最有效地将数据发送到外部系统.
    其他要记住的要点:


    DataFrame 和 SQL 操作
    您可以轻松地在流数据上使用 DataFrames and SQL 和 SQL 操作。您必须使用 StreamingContext 正在使用的 SparkContext 创建一个 SparkSession。此外,必须这样做,以便可以在 driver 故障时重新启动。这是通过创建一个简单实例化的 SparkSession 单例实例来实现的。这在下面的示例中显示。它使用 DataFrames 和 SQL 来修改早期的字数 示例以生成单词计数。将每个 RDD 转换为 DataFrame,注册为临时表,然后使用 SQL 进行查询。
    /** DataFrame operations inside your streaming program */

    val words: DStream[String] = ...

    words.foreachRDD { rdd =>

      // Get the singleton instance of SparkSession
      val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
      import spark.implicits._

      // Convert RDD[String] to DataFrame
      val wordsDataFrame = rdd.toDF("word")

      // Create a temporary view
      wordsDataFrame.createOrReplaceTempView("words")

      // Do word count on DataFrame using SQL and print it
      val wordCountsDataFrame =
        spark.sql("select word, count(*) as total from words group by word")
      wordCountsDataFrame.show()
    }
    Checkpointing
    streaming 应用程序必须 24/7 运行,因此必须对应用逻辑无关的故障(例如,系统故障,JVM 崩溃等)具有弹性。为了可以这样做,Spark Streaming 需要 checkpoint 足够的信息到容错存储系统,以便可以从故障中恢复。checkpoint 有两种类型的数据。
    总而言之,元数据 checkpoint 主要用于从 driver 故障中恢复,而数据或 RDD checkpoint 对于基本功能(如果使用有状态转换)则是必需的。
    何时启用 checkpoint
    对于具有以下任一要求的应用程序,必须启用 checkpoint:
    请注意,无需进行上述有状态转换的简单 streaming 应用程序即可运行,无需启用 checkpoint。在这种情况下,驱动器故障的恢复也将是部分的(一些接收但未处理的数据可能会丢失)。这通常是可以接受的,许多运行 Spark Streaming 应用程序。未来对非 Hadoop 环境的支持预计会有所改善。
    如何配置 checkpoint
    可以通过在保存 checkpoint 信息的容错,可靠的文件系统(例如,HDFS,S3等)中设置目录来启用 checkpoint。这是通过使用 streamingContext.checkpoint(checkpointDirectory) 完成的。这将允许您使用上述有状态转换。另外,如果要使应用程序从 driver 故障中恢复,您应该重写 streaming 应用程序以具有以下行为。
    使用 StreamingContext.getOrCreate 可以简化此行为。这样使用如下。
    // Function to create and setup a new StreamingContext
    def functionToCreateContext(): StreamingContext = {
      val ssc = new StreamingContext(...)   // new context
      val lines = ssc.socketTextStream(...) // create DStreams
      ...
      ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
      ssc
    }

    // Get StreamingContext from checkpoint data or create a new one
    val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

    // Do additional setup on context that needs to be done,
    // irrespective of whether it is being started or restarted
    context. ...

    // Start the context
    context.start()
    context.awaitTermination()
    除了使用 getOrCreate 之外,还需要确保在失败时自动重新启动 driver 进程。这只能由用于运行应用程序的部署基础架构完成。这在 部署 部分进一步讨论。
    请注意,RDD 的 checkpoint 会导致保存到可靠存储的成本。这可能会导致 RDD 得到 checkpoint 的批次的处理时间增加。因此,需要仔细设置 checkpoint 的间隔。在小批量大小(例如:1秒),检查每个批次可能会显着降低操作吞吐量。相反,checkpoint 太少会导致谱系和任务大小增长,这可能会产生不利影响。对于需要 RDD checkpoint 的状态转换,默认间隔是至少10秒的批间隔的倍数。它可以通过使用 dstream.checkpoint(checkpointInterval) 进行设置。通常,DStream 的5到10个滑动间隔的 checkpoint 间隔是一个很好的设置。
    Accumulators,Broadcast 变量,和 Checkpoint
    在Spark Streaming中,无法从 checkpoint 恢复 Accumulators 和 Broadcast 变量。如果启用 checkpoint 并使用 Accumulators 或 Broadcast 变量,则必须为 Accumulators 和 Broadcast 变量 创建延迟实例化的单例实例,以便在 driver 重新启动失败后重新实例化。这在下面的示例中显示:
    object WordBlacklist {

      @volatile private var instance: Broadcast[Seq[String]] = null

      def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
        if (instance == null) {
          synchronized {
            if (instance == null) {
              val wordBlacklist = Seq("a", "b", "c")
              instance = sc.broadcast(wordBlacklist)
            }
          }
        }
        instance
      }
    }

    object DroppedWordsCounter {

      @volatile private var instance: LongAccumulator = null

      def getInstance(sc: SparkContext): LongAccumulator = {
        if (instance == null) {
          synchronized {
            if (instance == null) {
              instance = sc.longAccumulator("WordsInBlacklistCounter")
            }
          }
        }
        instance
      }
    }

    wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
      // Get or register the blacklist Broadcast
      val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
      // Get or register the droppedWordsCounter Accumulator
      val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
      // Use blacklist to drop words and use droppedWordsCounter to count them
      val counts = rdd.filter { case (word, count) =>
        if (blacklist.value.contains(word)) {
          droppedWordsCounter.add(count)
          false
        } else {
          true
        }
      }.collect().mkString("[", ", ", "]")
      val output = "Counts at time " + time + " " + counts
    })
    应用程序部署
    本节讨论部署 Spark Streaming 应用程序的步骤。
    要求
    要运行 Spark Streaming 应用程序,您需要具备以下功能。
    升级应用程序代码
    如果运行的 Spark Streaming 应用程序需要使用新的应用程序代码进行升级,则有两种可能的机制。

    Monitoring Applications(监控应用程序)
    除了 Spark 的 monitoring capabilities(监控功能),还有其他功能特定于 Spark Streaming。当使用 StreamingContext 时,Spark web UI 显示一个额外的 Streaming 选项卡,显示 running receivers(运行接收器)的统计信息(无论是 receivers(接收器)是否处于 active(活动状态),接收到的 records(记录)数,receiver error(接收器错误)等)并完成 batches(批次)(batch processing times(批处理时间),queueing delays(排队延迟)等)。这可以用来监视 streaming application(流应用程序)的进度。
    web UI 中的以下两个 metrics(指标)特别重要:
    如果 batch processing time(批处理时间)始终 more than(超过)batch interval(批间隔)and/or queueing delay(排队延迟)不断增加,表示系统是无法快速 process the batches(处理批次),并且正在 falling behind(落后)。在这种情况下,请考虑 reducing(减少) batch processing time(批处理时间)。
    Spark Streaming 程序的进展也可以使用 StreamingListener 接口,这允许您获得 receiver status(接收器状态)和 processing times(处理时间)。请注意,这是一个开发人员 API 并且将来可能会改善(即,更多的信息报告)。


    Performance Tuning(性能调优)
    在集群上的 Spark Streaming application 中获得最佳性能需要一些调整。本节介绍了可调整的多个 parameters(参数)和 configurations(配置)提高你的应用程序性能。在高层次上,你需要考虑两件事情:
    Reducing the Batch Processing Times(减少批处理时间)
    在 Spark 中可以进行一些优化,以 minimize the processing time of each batch(最小化每批处理时间)。这些已在 Tuning Guide(调优指南) 中详细讨论过。本节突出了一些最重要的。
    Level of Parallelism in Data Receiving(数据接收中的并行级别)
    通过网络接收数据(如Kafka,Flume,socket 等)需要 deserialized(反序列化)数据并存储在 Spark 中。如果数据接收成为系统的瓶颈,那么考虑一下 parallelizing the data receiving(并行化数据接收)。注意每个 input DStream 创建接收 single stream of data(单个数据流)的 single receiver(单个接收器)(在 work machine 上运行)。因此,可以通过创建多个 input DStreams 来实现 Receiving multiple data streams(接收多个数据流)并配置它们以从 source(s) 接收 data stream(数据流)的 different partitions(不同分区)。例如,接收 two topics of data(两个数据主题)的单个Kafka input DStream 可以分为两个 Kafka input streams(输入流),每个只接收一个 topic(主题)。这将运行两个 receivers(接收器),允许 in parallel(并行)接收数据,从而提高 overall throughput(总体吞吐量)。这些 multiple DStreams 可以 unioned(联合起来)创建一个 single DStream。然后 transformations(转化)为应用于 single input DStream 可以应用于 unified stream。如下这样做。
    val numStreams = 5
    val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
    val unifiedStream = streamingContext.union(kafkaStreams)
    unifiedStream.print()







    欢迎光临 168大数据 (http://www.bi168.cn/) Powered by Discuz! X3.2