[AppleScript] 纯文本查看 复制代码
2013-08-31 09:20:39 Starting to launch local task to process map join; maximum memory = 1004929024[/backcolor]
2013-08-31 09:20:41 Processing rows: 200000 Hashtable size: 199999 Memory usage: 38844832 rate: 0.039
2013-08-31 09:20:42 Processing rows: 275567 Hashtable size: 275567 Memory usage: 51873632 rate: 0.052
2013-08-31 09:20:42 Dump the hashtable into file: file:/tmp/hadoop/hive_2013-08-31_21-20-37_444_1135806892100127714/-local-10003/HashTable-Stage-1/MapJoin-a-10-000000_0.hashtable
2013-08-31 09:20:46 Upload 1 File to: file:/tmp/hadoop/hive_2013-08-31_21-20-37_444_1135806892100127714/-local-10003/HashTable-Stage-1/MapJoin-a-10-000000_0.hashtable File size: 11022975
2013-08-31 09:20:47 Processing rows: 300000 Hashtable size: 24432 Memory usage: 8470976 rate: 0.008
2013-08-31 09:20:47 Processing rows: 400000 Hashtable size: 124432 Memory usage: 25368080 rate: 0.025
2013-08-31 09:20:48 Processing rows: 500000 Hashtable size: 224432 Memory usage: 42968080 rate: 0.043
2013-08-31 09:20:49 Processing rows: 551527 Hashtable size: 275960 Memory usage: 52022488 rate: 0.052
2013-08-31 09:20:49 Dump the hashtable into file: file:/tmp/hadoop/hive_2013-08-31_21-20-37_444_1135806892100127714/-local-10003/HashTable-Stage-1/MapJoin-a-10-000001_0.hashtable
……
这次就会看到每次构建完一个hash table(也就是所对应的对应一个bucket),会把这个hash table写入文件,重新构建新的hash table。这样一来由于每个hash table的量比较小,也就不会有内存不足的问题,整个sql也能成功运行。不过光光是这个复制动作就要花去3分半的时间,所以如果整个job本来就花不了多少时间的,那这个时间就不可小视。
最后我们试试sort merge bucket map join,在bucket map join的基础上加上下面的设置即可:
[AppleScript] 纯文本查看 复制代码
set hive.optimize.bucketmapjoin.sortedmerge = true;[/backcolor]
set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
sort merge bucket map join是不会产生hash table复制的步骤的,直接开始做实际map端join操作了,数据在join的时候边做边读。跳过复制的步骤,外加join算法的改进,使得sort merge bucket map join的效率要明显好于bucket map join。
关于join的算法虽然有这么些选择,但是个人觉得,对于日常使用,掌握默认的reduce join和普通的(无bucket)map join已经能解决大多数问题。如果小表不能完全放内存,但是小表相对大表的size量级差别也非常大的时候也可以试试bucket map join,不过其hash table分发的过程会浪费不少时间,需要评估下是否能够比reduce join更高效。而sort merge bucket map join虽然性能不错,但是把数据做成bucket本身也需要时间,另外其发动条件比较特殊,就是两边join key必须都唯一(很多介绍资料中都不提这一点。强调下必须都是唯一,哪怕只有一个表不唯一,出来的结果也是错的。当然,根据其算法原理完全可以推敲出来)。这样的场景相对比较少见,“用户基本表 join 用户扩展表”以及“用户今天的数据快照 join 用户昨天的数据快照”这类场景可能比较合适。
这里顺便说个题外话,在数据仓库中,小表往往是维度表,而小表map join这件事情其实用udf代替还会更快,因为不用单独启动一轮job,所以这也是一种可选方案。当然前提条件是维度表是固定的自然属性(比如日期),只增加不修改(比如网站的页面编号)的情况也可以考虑。如果维度有更新,要做缓慢变化维的,当然还是维表好维护。至于维表原本的一个主要用途OLAP,以Hive目前的性能是没法实现的,也就不需要多虑了。
数据倾斜所谓数据倾斜,说的是由于数据分布不均匀,个别值集中占据大部分数据量,加上hadoop的计算模式,导致计算资源不均匀引起性能下降。下图就是一个例子:
还是拿网站的访问日志说事吧。假设网站访问日志中会记录用户的user_id,并且对于注册用户使用其用户表的user_id,对于非注册用户使用一个user_id=0代表。那么鉴于大多数用户是非注册用户(只看不写),所以user_id=0占据了绝大多数。而如果进行计算的时候如果以user_id作为group by的维度或者是join key,那么个别reduce会收到比其他reduce多得多的数据——因为它要接收所有user_id=0的记录进行处理,使得其处理效果会非常差,其他reduce都跑完很久了它还在运行。
倾斜分成group by造成的倾斜和join造成的倾斜,需要分开看。
group by造成的倾斜有两个参数可以解决,一个是hive.map.aggr,默认值已经为true,意思是会做map端的combiner。所以如果你的group by查询只是做count(*)的话,其实是看不出倾斜效果的,但是如果你做的是count(distinct),那么还是会看出一点倾斜效果。另一个参数是hive.groupby. skewindata。这个参数的意思是做reduce操作的时候,拿到的key并不是所有相同值给同一个reduce,而是随机分发,然后reduce做聚合,做完之后再做一轮MR,拿前面聚合过的数据再算结果。所以这个参数其实跟hive.map.aggr做的是类似的事情,只是拿到reduce端来做,而且要额外启动一轮job,所以其实不怎么推荐用,效果不明显。
如果说要改写SQL来优化的话,可以按照下面这么做:
[AppleScript] 纯文本查看 复制代码
/*改写前*/
select a, count(distinct b) as c from tbl group by a;
/*改写后*/
select a, count(*) as c
from (select distinct a, b from tbl) group by a;
join造成的倾斜,就比如上面描述的网站访问日志和用户表两个表join:
[AppleScript] 纯文本查看 复制代码
select a.* from logs a join users b on a.user_id = b.user_id;
hive给出的解决方案叫skew join,其原理把这种user_id = 0的特殊值先不在reduce端计算掉,而是先写入hdfs,然后启动一轮map join专门做这个特殊值的计算,期望能提高计算这部分值的处理速度。当然你要告诉hive这个join是个skew join,即:
[AppleScript] 纯文本查看 复制代码
set hive.optimize.skewjoin = true;
还有要告诉hive如何判断特殊值,根据hive.skewjoin.key设置的数量hive可以知道,比如默认值是100000,那么超过100000条记录的值就是特殊值。
skew join的流程可以用下图描述:
另外对于特殊值的处理往往跟业务有关系,所以也可以从业务角度重写sql解决。比如前面这种倾斜join,可以把特殊值隔离开来(从业务角度说,users表应该不存在user_id = 0的情况,但是这里还是假设有这个值,使得这个写法更加具有通用性):
[AppleScript] 纯文本查看 复制代码
select a.* from [/size][/p][p=25, null, left][size=3](
select a.*
from (select * from logs where user_id = 0) a
join (select * from users where user_id = 0) b
on a.user_id = b.user_id
union all
select a.*
from logs a join users b
on a.user_id <> 0 and a.user_id = b.user_id
)t;
数据倾斜不仅仅是hive的问题,其实是share nothing架构下必然会碰到的数据分布问题,对此学界也有专门的研究,比如skewtune。
SQL整体优化前面对于单个job如何做优化已经做过详细讨论,但是hive查询会生成多个job,针对多个job,有什么地方需要优化?
Job间并行首先,在hive生成的多个job中,在有些情况下job之间是可以并行的,典型的就是子查询。当需要执行多个子查询union all或者join操作的时候,job间并行就可以使用了。比如下面的代码就是一个可以并行的场景示意:
[AppleScript] 纯文本查看 复制代码
select * from
(
select count(*) from logs
where log_date = 20130801 and item_id = 1
union all
select count(*) from logs
where log_date = 20130802 and item_id = 2
union all
select count(*) from logs
where log_date = 20130803 and item_id = 3
)t
设置job间并行的参数是hive.exec.parallel,将其设为true即可。默认的并行度为8,也就是最多允许sql中8个job并行。如果想要更高的并行度,可以通过hive.exec.parallel. thread.number参数进行设置,但要避免设置过大而占用过多资源。
减少Job数另外在实际开发过程中也发现,一些实现思路会导致生成多余的job而显得不够高效。比如这个需求:查询某网站日志中访问过页面a和页面b的用户数量。低效的思路是面向明细的,先取出看过页面a的用户,再取出看过页面b的用户,然后取交集,代码如下:
[AppleScript] 纯文本查看 复制代码
select count(*)
from
(select distinct user_id
from logs where page_name = ‘a’) a
join
(select distinct user_id
from logs where blog_owner = ‘b’) b
on a.user_id = b.user_id;
这样一来,就要产生2个求子查询的job,一个用于关联的job,还有一个计数的job,一共有4个job。
但是我们直接用面向统计的方法去计算的话(也就是用group by替代join),则会更加符合M/R的模式,而且生成了一个完全不带子查询的sql,只需要用一个job就能跑完:
[AppleScript] 纯文本查看 复制代码
select count(*)
from logs group by user_id
having (count(case when page_name = ‘a’ then 1 end) > 0
and count(case when page_name = ‘b’ then 1 end) > 0)
第一种查询方法符合思考问题的直觉,是工程师和分析师在实际查数据中最先想到的写法,但是如果在目前hive的query planner不是那么智能的情况下,想要更加快速的跑出结果,懂一点工具的内部机理也是必须的。
当然了,也有同学有其它的思路,只是没有上面那么高效:
[AppleScript] 纯文本查看 复制代码
select count(*) from
(
select user_id,
count(case when blog_owner = 'a' then 1 end) as visit_z,
count(case when blog_owner = 'b' then 1 end) as visit_l
from cnblogs_visit_20130801 group by user_id
) t
where visit_z > 0 and visit_l > 0;
这种实现方式转换成job就只会有2个:内层的子查询和外层的统计,所以对 SQL 和原理都比较熟悉才能在 HIVE 中游刃有余~