最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

[Hive] Hive优化实战-大表join小表优化

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-27 17:15:44 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
-数据倾斜及join无关的优化
    Hive SQL的各种优化方法基本 都和数据倾斜密切相关。
    Hive的优化分为join相关的优化和join无关的优化,从项目的实际来说,join相关的优化占了Hive优化的大部分内容,而join相关的优化又分为mapjoin可以解决的join优化和mapjoin无法解决的join优化。
    

  1、数据倾斜
    倾斜来自于统计学里的偏态分布。所谓偏态分布,即统计数据峰值与平均值不相等的频率分布,根据峰值小于或大于平均值可分为正偏函数和负偏函数,其偏离的程度可用偏态系数刻画。
    对应分布式数据处理来说,希望数据平均分布到每个处理节点。如果以每个处理节点为X轴,每个节点处理的数据为Y轴,我希望的柱状图如下:
    
    但是实际上由于业务数据本身的问题或者分布算法的问题,每个节点分配到的数据量很可能是下面的样式:
      
    更极端情况出现下面的样式:
      
      也就是说,只有待分到最多数据的节点处理完数据,整个数据处理任务才算完成,此时分布式的意义大大折扣了。实际上,即使每个节点分配到的数据量大致相同,数据仍然可能倾斜,
    比如考虑统计词频的极端问题,如果某个节点分配的词都是一个词,那么显然此节点需要的耗时将很长。
      Hive的优化正是采用各种措施和方法对上述场景的倾斜问题进行优化和处理。

    2、Hive优化
        在实际的Hive SQL开发的过程中,Hive SQL 性能的问题上实际上只有一小部分和数据倾斜有关,很多时候,Hive SQL运行慢是由于开发人员对于使用的数据了解不够以及一些不良的习惯引起的。
      开发人员需要确定以下几点:
      1、 需要计算的指标真的需要从数据仓库公共明细层来自行汇总吗? 是不是数据公共层团队开发公共汇总层已经可以满足自己的需求?对应大众的、KPI相关的指标等通常设计良好的数据仓库公共层
         肯定已经包含了,直接使用即可。
      2、真的需要扫描那么多分区吗,比如对于销售事务明细表来说,扫描一年的分区和扫描一周的分区所带来的计算、IO开销完全是两个数量级,所耗费时间肯定是不同的,所以开发人员要仔细考虑因为需求,
        尽量不浪费计算和存储资源。
      3、尽量不要使用select * from your_table这样的方式,用到哪些列就指定哪些列,另外WHERE条件中尽量添加过滤条件,以去掉无关的行,从而减少整个MapReduce任务宠需要处理、分发的数据量。
      4、输入文件不要是大量的小文件,Hive默认的Input Split是128MB(可配置),小文件可先合并成大文件。

    3、join无关的优化
      Hive SQL性能问题基本上大部分都是和JOIN相关,对于和join无关的问题主要有group by相关的倾斜和count distinct相关的优化

     3.1、group by引起的倾斜优化
      group by引起的倾斜主要是输入数据行按照group by列分别布均匀引起的,比如,假设按照供应商对销售明细事实表来统计订单数,那么部分大供应商的订单量显然非常大,而多数供应商的订单量就一般,
      由于group by 的时候是按照供应商的ID分发到每个Reduce Task,那么此时分配到大供应商的Reduce task就分配了更多的订单,从而导致数据倾斜。
      对应group by引起的数据倾斜,优化措施非常简单,只需要设置下面参数即可:
      set  hive.map.aggr = true
      set  hive.groupby.skewindata = true
      此时,Hive在数据倾斜的时候回进行负载均衡。

     3.2、count distinct优化
      在Hive开发过程中,应该小心使用count distinct,因为很容易引起性能问题,比如下面的SQL:
      select count(distinct user) from some_table;
      由于必须去重,因此Hive将会把Map阶段的输出全部分布到一个Reduce Task上,此时很容易引起性能问题,对于这种情况,可以通过先group by再count的方式优化,优化后的SQL如下:
      select  count(*)
      from (select user from some_table group by user) temp;
      其原理为:利用group by去重,再统计group by 的行数目。


4、大表join小表优化
      和join相关的优化主要分为mapjoin可以解决的优化(即大表join小表)和mapjoin无法解决的优化(即大表join大表),前者相对容易解决,后者较难,比较麻烦。
      首先介绍大表join小表优化。以销售明细表为例来说明大表join小表的场景。
      假如供应商进行评级,比如(五星、四星、三星、二星、一星),此时因为人员希望能够分析各供应商星级的每天销售情况及其占比。
      开发人员一般会写出如下SQL:
      select seller_star, count(order_id) as order_cnt
      from
           (select order_id,seller_id    from sales_detail_table  where partition_value='20181010' ) a
        left outer join
        ( select seller_id, seller_start   from dim_seller  where partition_value =''20181010' ) b
        on a.seller_id = b.seller_id
        group by b.seller_star;
      现实世界的二八准则将导致订单集中在部分供应商上,而好的供应商的评级通常会更高,此时更加加剧了数据倾斜的程度,如果不加以优化,上面SQL将会耗费很长时间,,甚至运行不出结果。
      通常来说,供应商是有限的,比如上千家,上万家,数量不会很大,而销售明细表比较大,这就是典型的大表join小表的问题,可以通过mapjoin的方式来优化,只需要添加mapjoin hint即可,
      优化后的SQL如下:
      select /*+mapjoin(b)*/
         seller_star, count(order_id) as order_cnt
      from
           (select order_id,seller_id    from sales_detail_table  where partition_value='20181010' ) a
        left outer join
        ( select seller_id, seller_start   from dim_seller  where partition_value =''20181010' ) b
        on a.seller_id = b.seller_id
        group by b.seller_star;
       /*+mapjoin(b)*/ 即是mapjoin hint,如果需要多个mapjoin多个表,则格式为:/*+mapjoin(b,c,d)*/.。 Hive对于mapjoin是默认开启的,设置参数为:
      Set hive.auto.convert.join = true;
      mapjoin优化是在Map阶段进行join,而不是通常那样在Reduce阶段按照join列进行分发后在每个Reduce节点上进行join,不需要分发也就没有倾斜的问题,相反,Hive会将小表
      全量复制到每个Map任务节点(对于本例是dim_seller表,当然进全量复制b表 sql指定的列),然后每个Map任务节点执行lookup小表即可。
      从上面的分析可以看出,小表不能太大,否则全量复制分发得不偿失,实际上Hive根据参数hive.mapjoin.smalltable.size(0.11.0版本后是hive.auto.convert.join.nonconditionaltask.size) 来确定小表的
      大小是否满足条件(默认25MB),实际中此参数允许的最大值可以修改,但是一般最大不能超过1GB(太大的话Map任务所在的节点内存会撑爆,Hive会报错。另外需要注意的是,HDFS显示的文件
      大小是压缩后的大小,当实际加载到内存的时候,容量会增大很多,很多场景下会膨胀10倍)。
-大表join大表优化  5、大表join大表优化
      如果Hive优化实战2中mapjoin中小表dim_seller很大呢?比如超过了1GB大小?这种就是大表join大表的问题。首先引入一个具体的问题场景,然后基于此介绍各自优化方案。
   5.1、问题场景
      问题场景如下:
      A表为一个汇总表,汇总的是卖家买家最近N天交易汇总信息,即对于每个卖家最近N天,其每个买家共成交了多少单,总金额是多少,假设N取90天,汇总值仅取成交单数。
      A表的字段有:buyer_id、seller_id、pay_cnt_90day。
      B表为卖家基本信息表,其字段有seller_id、sale_level,其中sale_levels是卖家的一个分层评级信息,比如吧卖家分为6个级别:S0、S1、S2、S3、S4和S5。
      要获得的结果是每个买家在各个级别的卖家的成交比例信息,比如:
      某买家:S0:10%;S1:20%;S2:20%;S3:10%;S4:20%;S5:10%。
      正如mapjoin中的例子一样,第一反应是直接join两表并统计:
      select
         m.buyer_id,
        sum(pay_cnt_90day)  as pay_cnt_90day,
        sum(case when m.sale_level = 0  then pay_cnt_90day  end)  as pay_cnt_90day_s0,
        sum(case when m.sale_level = 1  then pay_cnt_90day  end)  as pay_cnt_90day_s1,
        sum(case when m.sale_level = 2  then pay_cnt_90day  end)  as pay_cnt_90day_s2,
        sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,
        sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,
        sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5
      from (
        select a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day
        from (  select buyer_id,  seller_id,  pay_cnt_90day   from table_A)  a
        join
               (select seller_id,  sale_level  from table_B)  b
        on  a.seller_id  = b.seller_id
        )  m
      group by m.buyer_id
      但是此SQL会引起数据倾斜,原因在于卖家的二八准则,某些卖家90天内会有几百万甚至上千万的买家,但是大部分的卖家90天内买家的数目并不多,join table_A和table_B的时候,
    ODPS会按照seller_id进行分发,table_A的大卖家引起了数据倾斜。
      但是数据本身无法用mapjoin table_B解决,因为卖家超过千万条,文件大小有几个GB,超过了1GB的限制。
   5.2、优化方案1
      一个很正常的想法是,尽管B表无法直接mapjoin, 但是是否可以间接mapjoin它呢?
      实际上此思路有两种途径:限制行和限制列。
      限制行的思路是不需要join B全表,而只需要join其在A表中存在的,对于本问题场景,就是过滤掉90天内没有成交的卖家。
      限制列的思路是只取需要的字段。
      加上如上的限制后,检查过滤后的B表是否满足了Hive  mapjoin的条件,如果能满足,那么添加过滤条件生成一个临时B表,然后mapjoin该表即可。采用此思路的语句如下:
      
      select
         m.buyer_id,
        sum(pay_cnt_90day)  as pay_cnt_90day,
        sum(case when m.sale_level = 0  then pay_cnt_90day  end)  as pay_cnt_90day_s0,
        sum(case when m.sale_level = 1  then pay_cnt_90day  end)  as pay_cnt_90day_s1,
        sum(case when m.sale_level = 2  then pay_cnt_90day  end)  as pay_cnt_90day_s2,
        sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,
        sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,
        sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5
      from (
        select /*+mapjoin(b)*/
          a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day
        from (  select buyer_id,  seller_id,  pay_cnt_90day   from table_A)  a
        join
               (
           select seller_id,  sale_level  from table_B b0
           join
           (select seller_id from table_A group by seller_id) a0
             on b0.seller_id = a0.selller_id
          )  b
        on  a.seller_id  = b.seller_id
        )  m
      group by m.buyer_id
      此方案在一些情况可以起作用,但是很多时候还是无法解决上述问题,因为大部分卖家尽管90天内买家不多,但还是有一些的,过滤后的B表仍然很多。
  5.3、优化方案2
      此种解决方案应用场景是:倾斜的值是明确的而且数量很少,比如null值引起的倾斜。其核心是将这些引起倾斜的值随机分发到Reduce,其主要核心逻辑在于join时对这些特殊值concat随机数,
    从而达到随机分发的目的。此方案的核心逻辑如下:
       select a.user_id, a.order_id, b.user_id
      from table_a a join table_b b
      on (case when a.user_is is null then concat('hive', rand()) else a.user_id end) = b.user_id
      Hive 已对此进行了优化,只需要设置参数skewinfo和skewjoin参数,不修改SQL代码,例如,由于table_B的值“0” 和“1”引起了倾斜,值需要做如下设置:
      set hive.optimize.skewinfo=table_Bselleer_id) [ ( "0") ("1") ) ]
      set hive.optimize.skewjoin = true;
      但是方案2因为无法解决本问题场景的倾斜问题,因为倾斜的卖家大量存在而且动态变化。
  
  5.4 、优化方案3:倍数B表,在取模join     1、通用方案
      此方案的思路是建立一个numbers表,其值只有一列int 行,比如从1到10(具体值可根据倾斜程度确定),然后放大B表10倍,再取模join。代码如下:
      
      select
         m.buyer_id,
        sum(pay_cnt_90day)  as pay_cnt_90day,
        sum(case when m.sale_level = 0  then pay_cnt_90day  end)  as pay_cnt_90day_s0,
        sum(case when m.sale_level = 1  then pay_cnt_90day  end)  as pay_cnt_90day_s1,
        sum(case when m.sale_level = 2  then pay_cnt_90day  end)  as pay_cnt_90day_s2,
        sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,
        sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,
        sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5
      from (
        select a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day
        from (  select buyer_id,  seller_id,  pay_cnt_90day   from table_A)  a
        join
               (
          select  /*+mapjoin(members)*/
            seller_id,  sale_level ,member
          from table_B
            join members
          )  b
        on  a.seller_id  = b.seller_id
          and mod(a.pay_cnt_90day,10)+1 = b.number
        )  m
      group by m.buyer_id
此思路的核心在于,既然按照seller_id分发会倾斜,那么再人工增加一列进行分发,这样之前倾斜的值的倾斜程度会减少到原来的1/10,可以通过配置numbers表改放大倍数来降低倾斜程度,
      但这样做的一个弊端是B表也会膨胀N倍。
    2、专用方案
        通用方案的思路把B表的每条数据都放大了相同的倍数,实际上这是不需要的,只需要把大卖家放大倍数即可:需要首先知道大卖家的名单,即先建立一个临时表动态存放每天最新的大卖家(
      比如dim_big_seller),同时此表的大卖家要膨胀预先设定的倍数(1000倍)。
        在A表和B表分别新建一个join列,其逻辑为:如果是大卖家,那么concat一个随机分配正整数(0到预定义的倍数之间,本例为0~1000);如果不是,保持不变。具体代码如下:
      
      select
         m.buyer_id,
        sum(pay_cnt_90day)  as pay_cnt_90day,
        sum(case when m.sale_level = 0  then pay_cnt_90day  end)  as pay_cnt_90day_s0,
        sum(case when m.sale_level = 1  then pay_cnt_90day  end)  as pay_cnt_90day_s1,
        sum(case when m.sale_level = 2  then pay_cnt_90day  end)  as pay_cnt_90day_s2,
        sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,
        sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,
        sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5
      from (
        select a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day
        from (  
          select  /*+mapjoin(big)*/
             buyer_id,  seller_id,  pay_cnt_90day,
             if(big.seller_id is not null, concat(  table_A.seller_id,  'rnd',  cast(  rand() * 1000 as bigint ), table_A.seller_id)  as seller_id_joinkey
              from table_A
               left outer join
             --big表seller_id有重复,请注意一定要group by 后再join,保证table_A的行数保持不变
             (select seller_id  from dim_big_seller  group by seller_id)big
             on table_A.seller_id = big.seller_id
        )  a
        join
               (
          select  /*+mapjoin(big)*/
            seller_id,  sale_level ,
            --big表的seller_id_joinkey生成逻辑和上面的生成逻辑一样
            coalesce(seller_id_joinkey,table_B.seller_id) as seller_id_joinkey
          from table_B
            left out join
          --table_B表join大卖家表后大卖家行数扩大1000倍,其它卖家行数保持不变
          (select seller_id, seller_id_joinkey from dim_big_seller) big
          on table_B.seller_id= big.seller_id
          )  b
        on  a.seller_id_joinkey= b.seller_id_joinkey
          and mod(a.pay_cnt_90day,10)+1 = b.number
        )  m
      group by m.buyer_id
      相比通用方案,专用方案的运行效率明细好了许多,因为只是将B表中大卖家的行数放大了1000倍,其它卖家的行数保持不变,但同时代码复杂了很多,而且必须首先建立大数据表。
   5.5 、动态一分为二
      实际上方案2和3都用了一分为二的思想,但是都不彻底,对于mapjoin不能解决的问题,终极解决方案是动态一分为二,即对倾斜的键值和不倾斜的键值分开处理,不倾斜的正常join即可,
    倾斜的把他们找出来做mapjoin,最后union all其结果即可。
      但是此种解决方案比较麻烦,代码复杂而且需要一个临时表存放倾斜的键值。代码如下:
      --由于数据倾斜,先找出90天买家超过10000的卖家
      insert overwrite table  temp_table_B
      select
        m.seller_id,  n.sale_level
      from (
        select   seller_id
        from (
          select seller_id,count(buyer_id) as byr_cnt
          from table_A
          group by seller_id
          ) a
        where a.byr_cnt >10000
        ) m
      left join
      (
       select seller_id, sale_level  from table_B
      ) n
        on m.seller_id = n.seller_id;
      
      --对于90天买家超过10000的卖家直接mapjoin,对其它卖家直接正常join即可。
      
      select
         m.buyer_id,
        sum(pay_cnt_90day)  as pay_cnt_90day,
        sum(case when m.sale_level = 0  then pay_cnt_90day  end)  as pay_cnt_90day_s0,
        sum(case when m.sale_level = 1  then pay_cnt_90day  end)  as pay_cnt_90day_s1,
        sum(case when m.sale_level = 2  then pay_cnt_90day  end)  as pay_cnt_90day_s2,
        sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,
        sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,
        sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5
      from (
        select a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day
        from (  select buyer_id,  seller_id,  pay_cnt_90day   from table_A)  a
        join
               (
          select seller_id,  a.sale_level
           from table_A  a
           left join temp_table_B b
          on a.seller_id = b.seller_id
          where b.seller_id is not null
          )  b
        on  a.seller_id  = b.seller_id
       union all
       
       select /*+mapjoin(b)*/
          a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day
        from (
           select buyer_id,  seller_id,  pay_cnt_90day   
          from table_A
          )  a
        join
               (
           select seller_id,  sale_level  from table_B
          )  b
        on  a.seller_id  = b.seller_id
     )  m  group by m.buyer_id
     ) m
     group by m.buyer_id
    总结:方案1、2以及方案3中的同用方案不能保证解决大表join大表问题,因为它们都存在种种不同的限制和特定使用场景。而方案3的专用方案和方案4是推荐的优化方案,但是它们都需要新建一个临时表
       来存储每日动态变化的大卖家。相对方案4来说,方案3的专用方案不需要对代码框架进行修改,但是B表会被放大,所以一定要是是维度表,不然统计结果会是错误的。方案4最通用,自由度最高,
       但是对代码的更改也最大,甚至修改更难代码框架,可以作为终极方案使用。
    
    参考资料:《离线和实时大数据开发实战》


楼主热帖
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享 分享淘帖 赞 踩

168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

GMT+8, 2024-4-24 13:20

Powered by BI168大数据社区

© 2012-2014 168大数据

快速回复 返回顶部 返回列表