启用WAL后虽然Receiver的数据可靠性风险降低了,但却由于磁盘持久化带来的开销,系统整体吞吐率会有明显的下降。因此,在最新发布的Spark 1.3版本里,Spark Streaming增加了使用Direct API的方式来实现Kafka数据源的访问。
引入了Direct API后,Spark Streaming不再启动常驻的Receiver接收任务,而是直接分配给每个Batch及RDD最新的topic partition offset。job启动运行后Executor使用Kafka的simple consumer API去获取那一段offset的数据。
这样做的好处不仅避免了Receiver宕机带来的数据可靠性风险,同时也由于避免使用ZooKeeper做offset跟踪,而实现了数据的精确一次性(以下代码删除了细枝末节的逻辑):
[AppleScript] 纯文本查看 复制代码
class DirectKafkaInputDStream{
protected val kc = new KafkaCluster(kafkaParams)
protected var currentOffsets = fromOffsets
override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
val rdd = KafkaRDD[K, V, U, T, R](
context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
Some(rdd)
}
预写日志 Write Ahead LogSpark 1.2开始提供了预写日志能力,用于Receiver数据及Driver元数据的持久化和故障恢复。WAL之所以能提供持久化能力,是因为它利用了可靠的HDFS做数据存储。
Spark Streaming预写日志机制的核心API包括:
- 管理WAL文件的WriteAheadLogManager
- 读/写WAL的WriteAheadLogWriter和WriteAheadLogReader
- 基于WAL的RDD:WriteAheadLogBackedBlockRDD
- 基于WAL的Partition:WriteAheadLogBackedBlockRDDPartition
以上核心API在数据接收和恢复阶段的交互示意图如图四所示。
图四 基于WAL的数据接收和恢复示意图
从WriteAheadLogWriter的源码里可以清楚地看到,每次写入一块数据buffer到HDFS后都会调用flush方法去强制刷入磁盘,然后才去取下一块数据。因此receiver接收的数据是可以保证持久化到磁盘了,因而做到了较好的数据可靠性。
[AppleScript] 纯文本查看 复制代码
private[streaming] class WriteAheadLogWriter{
private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)
def write(data: ByteBuffer): WriteAheadLogFileSegment = synchronized {
data.rewind() // Rewind to ensure all data in the buffer is retrieved
val lengthToWrite = data.remaining()
val segment = new WriteAheadLogFileSegment(path, nextOffset, lengthToWrite)
stream.writeInt(lengthToWrite)
if (data.hasArray) {
stream.write(data.array())
} else {
while (data.hasRemaining) {
val array = new Array[Byte](data.remaining)
data.get(array)
stream.write(array)
}
}
flush()
nextOffset = stream.getPos()
segment
}
结束语得益于Kafka这类可靠的data source、以及自身的checkpoint/WAL等机制,Spark Streaming的数据可靠性得到了很好的保证,数据能保证“至少一次”(at least once)被处理。但由于其outbound端的一致性实现还未完善,因此Exact once语义仍然不能端到端保证。Spark Streaming社区已经在跟进这个特性的实现(SPARK-4122),预计很快将合入trunk发布。
作者简介:叶琪,华为软件公司Universe产品部高级架构师,专注于大数据底层分布式存储和计算基础设施,是华为软件公司Hadoop发行版的主要架构师,目前兴趣点在流计算与Spark。