最具影响力的数字化技术在线社区

乔帮主 发表于 2015-5-20 15:49:11

Spark技术内幕:究竟什么是RDD

本帖最后由 乔帮主 于 2015-5-20 15:49 编辑

RDD是Spark最基本,也是最根本的数据抽象。http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf 是关于RDD的论文。如果觉得英文阅读太费时间,可以看这篇译文:http://shiyanjun.cn/archives/744.html
本文也是基于这篇论文和源码,分析RDD的实现。
第一个问题,RDD是什么?Resilient Distributed Datasets(RDD,) 弹性分布式数据集。RDD是只读的、分区记录的集合。RDD只能基于在稳定物理存储中的数据集和其他已有的RDD上执行确定性操作来创建。这些确定性操作称之为转换,如map、filter、groupBy、join(转换不是程开发人员在RDD上执行的操作)。
RDD不需要物化。RDD含有如何从其他RDD衍生(即计算)出本RDD的相关信息(即Lineage),据此可以从物理存储的数据计算出相应的RDD分区。
看一下内部实现对于RDD的概述:
Internally, each RDD is characterized by five main properties:
*
*- A list of partitions
*- A function for computing each split
*- A list of dependencies on other RDDs
*- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
*- Optionally, a list of preferred locations to compute each split on (e.g. block locations for
*    an HDFS file)
每个RDD有5个主要的属性:


[*]一组分片(partition),即数据集的基本组成单位
[*]一个计算每个分片的函数
[*]对parent RDD的依赖,这个依赖描述了RDD之间的lineage
[*]对于key-value的RDD,一个Partitioner
[*]一个列表,存储存取每个partition的preferred位置。对于一个HDFS文件来说,存储每个partition所在的块的位置。



org.apache.spark.rdd.RDD是一个抽象类,定义了RDD的基本操作和属性。这些基本操作包括map,filter和persist。另外,org.apache.spark.rdd.PairRDDFunctions定义了key-value类型的RDD的操作,包括groupByKey,join,reduceByKey,countByKey,saveAsHadoopFile等。org.apache.spark.rdd.SequenceFileRDDFunctions包含了所有的RDD都适用的saveAsSequenceFile。

RDD支持两种操作:转换(transformation)从现有的数据集创建一个新的数据集;而动作(actions)在数据集上运行计算后,返回一个值给驱动程序。 例如,map就是一种转换,它将数据集每一个元素都传递给函数,并返回一个新的分布数据集表示结果。另一方面,reduce是一种动作,通过一些函数将所有的元素叠加起来,并将最终结果返回给Driver程序。(不过还有一个并行的reduceByKey,能返回一个分布式数据集)Spark中的所有转换都是惰性的,也就是说,他们并不会直接计算结果。相反的,它们只是记住应用到基础数据集(例如一个文件)上的这些转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这个设计让Spark更加有效率的运行。例如,我们可以实现:通过map创建的一个新数据集,并在reduce中使用,最终只返回reduce的结果给driver,而不是整个大的新数据集。默认情况下,每一个转换过的RDD都会在你在它之上执行一个动作时被重新计算。不过,你也可以使用persist(或者cache)方法,持久化一个RDD在内存中。在这种情况下,Spark将会在集群中,保存相关元素,下次你查询这个RDD时,它将能更快速访问。在磁盘上持久化数据集,或在集群间复制数据集也是支持的。
下表列出了Spark中的RDD转换和动作。每个操作都给出了标识,其中方括号表示类型参数。前面说过转换是延迟操作,用于定义新的RDD;而动作启动计算操作,并向用户程序返回值或向外部存储写数据。
表1 Spark中支持的RDD转换和动作
转换map(f : T ) U) : RDD ) RDD
filter(f : T ) Bool) : RDD ) RDD
flatMap(f : T ) Seq) : RDD ) RDD
sample(fraction : Float) : RDD ) RDD (Deterministic sampling)
groupByKey() : RDD[(K, V)] ) RDD[(K, Seq)]
reduceByKey(f : (V; V) ) V) : RDD[(K, V)] ) RDD[(K, V)]
union() : (RDD; RDD) ) RDD
join() : (RDD[(K, V)]; RDD[(K, W)]) ) RDD[(K, (V, W))]
cogroup() : (RDD[(K, V)]; RDD[(K, W)]) ) RDD[(K, (Seq, Seq))]
crossProduct() : (RDD; RDD) ) RDD[(T, U)]
mapValues(f : V ) W) : RDD[(K, V)] ) RDD[(K, W)] (Preserves partitioning)
sort(c : Comparator) : RDD[(K, V)] ) RDD[(K, V)]
partitionBy(p : Partitioner) : RDD[(K, V)] ) RDD[(K, V)]
动作count() : RDD ) Long
collect() : RDD ) Seq
reduce(f : (T; T) ) T) : RDD ) T
lookup(k : K) : RDD[(K, V)] ) Seq (On hash/range partitioned RDDs)
save(path : String) : Outputs RDD to a storage system, e.g., HDFS
注意,有些操作只对键值对可用,比如join。另外,函数名与Scala及其他函数式语言中的API匹配,例如map是一对一的映射,而flatMap是将每个输入映射为一个或多个输出(与MapReduce中的map类似)。
除了这些操作以外,用户还可以请求将RDD缓存起来。而且,用户还可以通过Partitioner类获取RDD的分区顺序,然后将另一个RDD按照同样的方式分区。有些操作会自动产生一个哈希或范围分区的RDD,像groupByKey,reduceByKey和sort等。
从一个例子开始下面的例子摘自RDD的论文,实现了处理一个HDFS日志文件中错误日志的逻辑。
lines = spark.textFile("hdfs://...")// lines is a org.apache.spark.rdd.MappedRDDerrors = lines.filter(_.startsWith("ERROR")) // errors is a org.apache.spark.rdd.FilteredRDDerrors.cache() // persist 到内存中errors.count() // 触发action,计算errors有多少个,即ERROR的多少行// Count errors mentioning MySQL:errors.filter(_.contains("MySQL")).count() // Return the time fields of errors mentioning// HDFS as an array (assuming time is field// number 3 in a tab-separated format):errors.filter(_.contains("HDFS"))      .map(_.split('\t')(3))      .collect()
spark是一个org.apache.spark.SparkContext的实例,基本上spark的应用都是以定义一个SparkContext开始的。textFile的定义如下:
/**
   * Read a text file from HDFS, a local file system (available on all nodes), or any
   * Hadoop-supported file system URI, and return it as an RDD of Strings.
   */
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD = {
    hadoopFile(path, classOf, classOf, classOf,
      minPartitions).map(pair => pair._2.toString).setName(path)
}

hadoopFile创建了一个org.apache.spark.rdd.HadoopRDD,而在HadoopRDD上调用map则生成了一个MappedRDD:

/**
   * Return a new RDD by applying a function to all elements of this RDD.
   */
def map(f: T => U): RDD = new MappedRDD(this, sc.clean(f))


errors.cache()并不会立即执行,它的作用是在RDD的计算完成后,将结果cache起来,以供以后的计算使用,这样的话可以加快以后运算的速度。errors.count() 就触发了一个action,这个时候就需要向集群提交job了: /**   * Return the number of elements in the RDD.   */
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum提交后,SparkContext会将runJob提交到DAGScheduler,DAGScheduler会将当前的DAG划分成Stage,然后生成TaskSet后通过TaskScheduler的submitTasks提交tasks,而这又会调用SchedulerBackend,SchedulerBackend会将这些任务发送到Executor去执行。如何划分Stage?如何生成tasks?接下来会进行解析。明天要上班了,今天早点休息吧。






Spark精英汇
页: [1]
查看完整版本: Spark技术内幕:究竟什么是RDD