Parsed Logical Plan
Spark使用 ANTLR4来将SQL字符串解析为最初的LogicalPlan。
调用Spark的Analyzer将最初的Parsed Plan转化成分析后的LogicalPlan。
将Analyzed Logical Plan调用catalyst内置的优化策略,生成优化后的LogicalPlan。
使用Spark的物理策略处理优化后的LogicalPlan的每个节点。
Databricks以及众多Spark代码贡献者为Spark做了许多优化,最著名的,有SQL优化引擎Catalyst,有针对内存、CPU和I/O的Tungsten计划,还有正在逐步完善的CBO。在其中产生了Code Generation技术和Vectorization技术。参考[SPARK-12795]和[SPARK-12992]。
CodeGen
全称是Whole-Stage Code Generation技术,会在运行时动态生成Java代码,避免虚函数调用。
每次调用next()返回一个batch的数据,减少虚函数调用次数。
基于代价的优化,主要用来减少join的shuffle。
首先还是说一下基本的火山模型。
1.3.1 Volcano火山模型是一般SQL引擎经常使用的一种模型,还是用上文的例子:
其语法树的火山模型如上图,project节点传next()方法给filter节点,filter节点传next()给全表扫描节点,然后全表扫描节点返回元组给tuple,以此类推。可以发现调用了非常多的next()方法。而next()函数又是虚函数,那么什么是虚函数?给出以下几个定义:
可以看到,如果是这种传统的火山模型,效率会非常低。
1.3.2 Whole-Stage Code Generation那么我们再看一个例子:
这个例子当然可以用火山模型来画出来,但是如果换种Java手写代码的方式呢?
可以看到完全不需要next()方法。网上有一张很著名的图,是一张关于火山模型和大学新生手写代码的性能对比:
非常可怕,对不对?Spark为此实现了Whole-Stage Code Generation优化技术。其目的如下:
向量化的作用与CodeGen类似:
一个IoT下的场景:数据原本存于传统RDBMS,有唯一标识id,为了迎接工业4.0,迁移到大数据平台,要求查询性能跟查询RDBMS性能持平,并且还需要一些固定查询做ETL和报表。
那么IoT数据有什么特征?
我们没有用HBase和Impala,为什么呢?因为我们的数据
所以最终我们选择的方案是直接存到HDFS上:
使用Parquet
Spark推荐的、其支持非常好的高性能列式存储格式,Spark SQL也为其实现了向量化。并且Parquet支持min/max,同时也会将count等信息存入其meta中。
将数据最常见使用场景中的关键字段作为分区字段,使用一个分区UDF或者直接使用该分区字段作为分区值,将数据根据分区按照时间顺序存放在HDFS。
Parquet结合Spark可以实现分区过滤。将查询条件里的分区字段下推求UDF后的分区,实现分区过滤,减少I/O。此外,通过Parquet的min/max过滤,进一步减少开销。
假设有一个titem表有三个字段,id,name,type,其中type是ttype表的主键,那么我们可以用type外键来当作分区字段,分区前的schema如下:
分区后的schema如下:
partitionColumn:
dataColumns:
存到HDFS上的路径就会是如下图的格式:
在这种情况下,用Spark来读写分别是:
当然,实际情况下我们并不会用这种方式来写,因为这种方式会导致全部shuffle,数据还是通过Kafka或者Flink来导入单独append在每个分区目录的里面的。
这种情况下性能对比如图:
2.1 Vectorization的使用限制Spark源码中,FileFormat接口有一个方法叫supportBatch(),顾名思义,就是能不能使用nextBatch(),也就是向量化的批量返回。那么在ParquetFileFormat实现类里,是这么继承这个方法的:
可以看到,当中有一个参数,叫做 wholeStageMaxNumFields,也就是支持CodeGen的最大字段数量:
可以看到,它的默认值是100,为什么会有这个设置呢?两个原因:
需要继承接口:
3 Spark 查询优化之执行计划优化3.1 Cost-Based Optimization(CBO)基于代价的优化,就是根据实际数据量,评估每个计划的执行代价,选择代价最小的执行计划,主要用于Join优化,减少Shuffle。举一个tpc-ds的例子:
它的语法树是:
也就是说,在CBO开启的情况下,它知道storesales和datedim优先join是最好的,因为date_dim是维度表,数据量非常小,而item的数据量是大的。
使用方法:
配置项:spark.sql.cbo要设置为true。
意义:
减少Join的开销。
聚合下推,曾经号称在Spark2.3就要实现,但是迟迟未上,可能由于其真的难点太多,主要难点有二:
对于有Join的情况,要确定推其哪个分支。
但是在某些特定case下聚合下推还是比较好实现的,比如一个大学Oracle课程中的小例子:
在聚合下推前,执行计划是这样的:
在聚合下推后,执行计划变成了这样:
可以看到,在Join的其中一个分支,加入了一个聚合节点。
在tpc-ds某句SQL中拉下来一个单独的子SQL的性能对比如下:
由于下推到的数据源本身并没有实现聚合功能,所以相当于推到Spark的driver端后我们自己写代码计算了一遍聚合,如果在数据源本身就有聚合结果的话,查询时间是完全可以在10秒以内的。
欢迎光临 168大数据 (http://www.bi168.cn/) | Powered by Discuz! X3.2 |