最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

spark sql 在饿了么的应用实践

[复制链接]
跳转到指定楼层
楼主
发表于 2019-8-20 14:35:20 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
作者丨饿了么数据架构组

  背景介绍
目前"饿了么"Spark主要应用在streaming、ETL和ML场景上,本文主要是分享我们ETL场景从hive SQL到spark SQL的迁移实践。在整个迁移过程中我们把线上多个版本的spark(1.5.2,1.6.3)统一推动升级到2.1.1,同时从Standalone运行模式迁移到了On YARN模式,以减少我们的维护成本。在安全控制上我们参考hive的权限系统开发了统一的权限验证模块实现了hive和spark(还有presto)权限统一。在稳定性和语法兼容性问题上我们merge了社区的12个pr,自己修复了22个bug,同时向社区贡献了2个pr(SPARK-21054/SPARK-21055),最终成功率从默认的50%提高到了92.5%,目前已经迁移了100+以上的任务且已经稳定运行3个月以上,整体效率提高了6x以上(友情建议:spark2.1.1坑比较多,请使用Spark 2.2.0以后的版本)

1 构建标准测试环境
首先需要一个测试环境,是问数仓的童鞋要一些SQL,然后挨个测试吗?这显然效率太低了,而且SQL覆盖面不全。于是我们通过在hive客户端中增加了query hook和解析HistoryLog(hive.query.string字段),收集到了所有的线上ETL SQL语句,根据收集到的SQL记录开发了一个程序批量替换脚本里面的目标表名(即create table和insert overwrite table所涉及的表)来进行模拟线上实际运行情况。完成脚本的批量替换之后,就可以开始执行脚本了。我们使用的Spark版本是2.1.1,第一轮测试的结果惨不忍睹啊,只有50%的成功率…
        
问题主要是以下4类:
        1、稳定性问题
        2、数据质量问题
        3、兼容性问题
        4、性能问题

2 稳定性问题
2.1 Task执行完毕释放锁NSE
第一轮的测试结果为什么如此惨淡,原因就是这个BUG,PR是SPARK-16599,在任务结束之后释放锁的时候没有进行空判断,这个错误在2.2.0已经修复,但是2.1尚未修复。一行代码的修改就让成功率上升到了70%多。
2.2 Job失败导致SparkContext退出
Spark ThriftServer每隔几天就会挂一次,发现当Job失败的时候,有一定的概率会触发这个BUG,同样是没有做空判断,这个BUG在2.3.0才修复,PR是SPARK-20945。

2.3 读取大小为0的orc文件会导致任务失败
Orc的分split有3种策略(ETL、BI、HYBIRD),默认是HYBIRD(混合模式,根据文件大小和文件个数自动选择ETL还是BI模式),问题出在BI模式,BI模式是按照文件个数来分split的,ETL模式已经修复了这个问题,因此有两个解决方法:

1、  修改hive代码进行空判断

2、  设置hive.exec.orc.split.strategy为ETL

3 数据质量问题
数据质量问题大多数情况下都是出在需要转换的地方,以下的这些问题都在2.2.0当中修复了。
SPARK-19727
round函数问题
SPARK-20211
floor和ceil函数问题
SPARK-17913
过滤条件没法对string和long类型进行隐式转换
4 兼容性问题
4.1 UDF问题
1、以前的UDF是在hive上运行的,可能会有线程安全问题,目前发现有的日期处理的UDF用到了SimpleDateFormat,Spark在执行的时候会涉及到多线程(hive没有这个问题)一用该UDF就出错,需要修改UDF的代码,把SimpleDateFormat设置成ThreadLocal的。
2、不支持返回值是list和map的udf,这个在2.2.0已经支持了,PR: SPARK-19548。

4.2 不支持grouping_id
Spark不支持hive的grouping__id,它自己搞了一个grouping_id()来代替,但是这样会引发兼容性问题,因此为什么不在解析SQL的时候把grouping__id自动转换成grouping_id()呢?这里有一个可用的PR: SPARK-21055。

4.3 参数兼容性
有一些参数在hive当中是有意义的,比如说用户设置hive.exec.reducers.bytes.per.reducer是希望设置一个reduce处理多大的数据量,spark当中也有相应的参数,需要做匹配。以下是我们做了匹配的参数。

hive.exec.reducers.bytes.per.reducer
spark.sql.adaptive.shuffle.targetPostShuffleInputSize
mapred.reduce.tasks
spark.sql.shuffle.partitions
hive.mapjoin.smalltable.filesize
spark.sql.autoBroadcastJoinThreshold

4.4 复杂函数不起别名
在hive当中,如果没有给某个通过计算得到的列起别名的话,hive默认是会给起一个以_c开头的列名,但是spark却不会,当调用到某些可能会返回逗号的函数的时候(比如说get_json_object),会报列个数不匹配的问题。该问题的work around建议是给所有的列都起别名,拒绝使用_c0的这样的别名。

4.5 不支持永久函数
这个问题很久了,不支持的原因是Spark的代码里没有去HDFS上把jar包下载下来。另外临时函数是不需要指定库名的,但是永久函数是需要的,为了推广永久函数特意加了一个功能:在当前库找不到对应函数的时候,再去查找一下default库下的永久函数。

4.6 不支持reset参数
       一开始只使用了set命令,但是发现线上还有reset命令的场景… 这里有一个可用的PR: SPARK-21054。

4.7 乱象丛生,细节是魔鬼
诸如此类的不兼容问题还有很多……

1、  abs函数传一个string类型进去,人家hive支持…

2、  窗口函数在Spark 2.x比如强制加order by,人家hive不需要…

3、  drop table不写if exists的话,当表不存在的时候Spark是报错的,人家hive不报错…

4、  分区名居然是大小写敏感的,人家hive不敏感…

5、  不支持笛卡尔积,人家hive支持…

总之结论就是:统统都得支持,否则没法迁移。
5性能问题
除了以上这一系列问题之外,发现有一些SQL跑得比hive慢,这是为什么呢?把SQL运行起来,仔细分析总共有三种问题:

1、MapJoin问题

2、窗口函数堆外内存用超

3、CPU密集型的任务


5.1 MapJoin问题
这些SQL的问题,从DAG图和Stage的信息来分析,会让人感觉这是数据倾斜的问题,那么问题来了,既然是倾斜,为什么Spark有问题,hive没问题呢?倾斜最严重的时候,单个Task的任务有几十G,会导致任务所在机器的单块磁盘IO长时间100%,这会导致DataNode上大量的线程被卡住,新的客户端连接进来会一直等到锁,当达到dfs.socket.timeout时间之后,会有大量的任务显示超时错误。仔细对比hive的执行过程当中有MapJoin,出现问题的地方都是一个大表和一个小表做连接。对Spark的MapJoin问题进行分析,发现有以下4个问题:

1、  不支持MapJoin Hint。Spark 2.2.0支持了,Spark2.1需要合并SPARK-16475的PR

2、 MapJoin中对于被广播表大小的判断,依赖于hive元数据中表大小字段(DataSize/RawDataSize),如果该值错误,可能会造成大表被广播到内存,有OOM的风险

3、  MapJoin的判断是根据整个表的数据量,而不是根据查询的分区数据量做判断,因此在分区表上MapJoin是失效的。社区上有人提过PR:SPARK-15616,但是该PR在Spark 2.1无法运行,我们根据它的思路实现了一版

4、  Create Table AS语句会导致MapJoin失效,原因是它的子查询没有进行物理执行计划的优化


5.2 窗口函数堆外内存用超
这个问题根源是因为合并了社区的SPARK-13450,因此Spark 2.2.0也会存在这个问题。在使用窗口函数的时候,发现Executor一到shuffle阶段就频繁地被Yarn杀掉,直接登录到服务器上观察Executor的进程,发现它在某个阶段堆外内存占用会瞬间变大。
经过仔细排查该问题,发现是因为窗口函数spill的阈值非常小,默认值是4096,也就是说每4096条数据就flush生成一个文件。在合并阶段需要读入所有的文件,每个文件的buffer是堆外内存1M,如果同时有几千个文件,堆外内存就超了。
解决方法:设大spark.sql.windowExec.buffer.spill.threshold设置为1500000

5.3 CPU密集型任务
有些任务很简单,但是CPU消耗很大,对于这种任务,简单的设小mapred.max.split.size就可以让任务飞起来。反观Spark,因为担心有的任务占用资源太多,spark.dynamicAllocation.maxExecutors设置得并不大,和mapreduce动辄几千个map的任务来比,速度差得不是那么一点点。即便把Spark设置成一个Executor只执行一个任务,和mapreduce比也没有什么收益,因此这种任务不适合切换成Spark。

楼主热帖
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享 分享淘帖 赞 踩

168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

GMT+8, 2024-4-27 03:22

Powered by BI168大数据社区

© 2012-2014 168大数据

快速回复 返回顶部 返回列表