最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

一篇文章学会spark-streaming

[复制链接]
跳转到指定楼层
楼主
发表于 2019-10-25 11:55:49 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
1.什么是Spark-streaming?
实际生产中会有许多应用到实时处理的场景,比如:实时监测页面点击,实时监测系统异常,实时监测来自于外部的攻击。针对这些场景,twitter研发了实时数据处理工具storm,并在后来开源。spark针对这些场景设计了spark-streaming实时计算模型,它允许用户使用一系列批处理的API去处理实时数据,能做到代码逻辑的重复使用。
和spark中的rdd非常相似,spark-streaming中使用离散化流(discretized stream)作为抽象的表示,叫做DStream。它是随时间推移而收集数据的序列,每个时间段收集到的数据在DStream内部以一个RDD的形式存在。DStream支持从kafka,flume,hdfs,s3等获取输入。DStream也支持两种操作,即转化操作和输出操作(区别于RDD中的行动操作)。转化操作又分为无状态的转化操作和有状态的转化操作,无状态的转化操作有map,filter,flatmap,repartition等,是针对单个时间区间内的操作。而有状态的转化操作可以针对不同的时间区间,后面详述。
2.两个简单的例子
2.1 监听socket获取数据,代码如下:
这里使用nc -lk 9999 在ip为10.121.33.44的机器上发送消息
[AppleScript] 纯文本查看 复制代码
object SocketStream {
  def main(args: Array[String]): Unit = {
    //本地测试,设置4核
    val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
    //以10秒为一个批次
    val ssc = new StreamingContext(conf,Seconds(10))
    //接收消息
    val dstream = ssc.socketTextStream("10.121.33.44",9999,StorageLevel.MEMORY_AND_DISK_SER)
    //监测关键字error,出现则print
    dstream.filter(_.contains("error")).foreachRDD(rdd=>{
      rdd.foreach(println(_))
    })
    ssc.start()
    ssc.awaitTermination()
  }
}
2.2 从kafka读取数据,比较常用
[AppleScript] 纯文本查看 复制代码
object KafkaStream {

  def main(args: Array[String]): Unit = {
    //本地测试,设置4核
    val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
    //以10秒为一个批次
    val ssc = new StreamingContext(conf,Seconds(10))

    val zkQuorum = "10.22.33.44:6688,10.22.33.45:6688/kafka_cluster"
    val group_id = "realtime_data"

    //kafka相关参数
    val kafka_param = Map[String,String](
      "zookeeper.connect" ->zkQuorum,
      "group.id" -> group_id,
      "zookeeper.connection.timeout.ms" -> "10000",
      "fetch.message.max.bytes" -> "10485760"
    )
    val topic = Map[String,Int]("test_topic" -> 16)
    //接收消息
    val dstream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
    //监测关键字error,出现则print
    dstream.filter(_.contains("error")).foreachRDD(rdd=>{
      rdd.foreach(println(_))
    })

    ssc.start()
    ssc.awaitTermination()
  }
}
3.再来谈架构
通过上面两个例子,你可能对spark-streaming有了初步的了解,我们再来看一下它的架构。
Spark-streaming使用”微批次”的架构,把流式计算当做一系列微型的批处理操作来对待,每个时间段都产生一个RDD。如图:

作用于一个DStream上的无状态转化操作会对它其中的每个RDD生效,如针对一个输入为语句的DStream做flatMap操作的示意图如下:
4.转化操作
4.1 无状态的转化操作。
无状态转化操作就是简单的将转化作用于DStream的每个RDD上面。下面列举了一些常见的转化操作,其中最后一个transform表示可以试用自定义的转化函数,尽管它前面已经提供了很多现成的API。

4.2有状态的转化操作。
有状态的转化操作是跨时间段的数据操作,一些先前的批次也被用来在新的批次中做计算。主要有滑动窗口和updateStateByKey。前者以一个时间段为滑动窗口进行操作,后者则用来跟踪每个键的状态变化。有状态的转化操作需要打开检查点机制来保证容错性。即:给ssc.checkpoint()设置一个检查点目录。
(1)基于窗口的转化操作会在一个比ssc设置的更长的时间段内,通过整合多个批次的,计算出整个大的时间窗口的结果。基于窗口的操作需要两个参数,一个是窗口时长,一个是滑动步长。这两个参数是ssc设置的时长的整数倍。下面的图表示了一个时间窗口为3,滑动步长为2的窗口转化操作。

前面提到的监测关键字error的例子,现在需要每隔20s就对前面30s有error的日志记录做计数,代码如下:
[AppleScript] 纯文本查看 复制代码
object KafkaStream {

  def main(args: Array[String]): Unit = {
    //本地测试,设置4核
    val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
    //以10秒为一个批次
    val ssc = new StreamingContext(conf,Seconds(10))

    val zkQuorum = "10.22.33.44:6688,10.22.33.45:6688/kafka_cluster"
    val group_id = "realtime_data"

    //kafka相关参数
    val kafka_param = Map[String,String](
      "zookeeper.connect" ->zkQuorum,
      "group.id" -> group_id,
      "zookeeper.connection.timeout.ms" -> "10000",
      "fetch.message.max.bytes" -> "10485760"
    )
    val topic = Map[String,Int]("test_topic" -> 16)
    //接收消息
    val dstream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER)
      .map(_._2)
    //每隔20s对前30s出现error的日志做计数
    val errors = dstream.window(Seconds(30),Seconds(20))
        .filter(_.contains("error"))
        .count()
    errors.foreachRDD(rdd=>{
      rdd.foreach(println(_))
    })
    ssc.start()
    ssc.awaitTermination()
  }
}
(2)updateStateByKey
updateStateByKey能对键值对的数据进行不同批次间的数据计算,使用updateStateByKey,需要传入一个update函数,这个函数接收某个key最新批次对应的values,以及该key之前对应的value,按照自定义的逻辑返回一个新的value。如需要计算一个实时日志中http响应码的计数,代码如下:
[AppleScript] 纯文本查看 复制代码
object KafkaStream {

  def main(args: Array[String]): Unit = {
    //输出目录
    val output = args(0)
    //本地测试,设置4核
    val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
    //以10秒为一个批次
    val ssc = new StreamingContext(conf,Seconds(10))

    val zkQuorum = "10.22.33.44:6688,10.22.33.45:6688/kafka_cluster"
    val group_id = "realtime_data"

    //kafka相关参数
    val kafka_param = Map[String,String](
      "zookeeper.connect" ->zkQuorum,
      "group.id" -> group_id,
      "zookeeper.connection.timeout.ms" -> "10000",
      "fetch.message.max.bytes" -> "10485760"
    )
    val topic = Map[String,Int]("test_topic" -> 16)
    //接收消息
    val dstream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
    val rdd = dstream.map(_.split("\001"))
      .map(x=>(x(0),x(1).toLong))
      .updateStateByKey(update)
    //输出
    rdd.foreachRDD(_.saveAsTextFile(output))
    ssc.start()
    ssc.awaitTermination()
  }
  //update函数
  def update(new_values:Seq[Long],old_value:Option[Long]):Option[Long]={
    val current_num = new_values.size
    val result_num = current_num + old_value.getOrElse(0L)
    Some(result_num)
  }
}
(3)所有有状态转化操作
5.输出操作
输出操作比较简单,有以下几种:
6.作业稳定性
spark-streaming作业一般都要全天候不间断运行,那么作业的稳定性如何保证?主要有以下几点:
6.1 检查点机制。
其原理就是阶段性的将作业运行的数据存放到存储系统,如hdfs,s3等。当作业运行出现异常时可以从上述数据中恢复。
6.2 驱动器容错。
在创建实时计算作业的上下文时使用getOrCreate函数。代码如下:
[AppleScript] 纯文本查看 复制代码
  val ssc = StreamingContext.getOrCreate(cp_dir,createContext )
    def createContext(): StreamingContext  ={
      val sc = new SparkContext(conf)
      val ssc = new StreamingContext(sc,Seconds(10))
      ssc.checkpoint(cp_dir)
    }
文章来源:http://bigdataer.net

楼主热帖
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享 分享淘帖 赞 踩

168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

GMT+8, 2024-4-26 00:38

Powered by BI168大数据社区

© 2012-2014 168大数据

快速回复 返回顶部 返回列表