最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

[Flink] Flink数据倾斜问题

[复制链接]
跳转到指定楼层
楼主
发表于 2021-12-23 15:16:16 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
一、数据倾斜
1、什么是数据倾斜?
由于数据分布不均匀,造成数据大量的集中到一点,造成数据热点。
数据倾斜原理
目前我们所知道的大数据处理框架,比如 Flink、Sparkhadoop 等之所以能处理高达千亿的数据,是因为这些框架都利用了分布式计算的思想,集群中多个计算节点并行,使得数据处理能力能得到线性扩展。
在实际生产中 Flink 都是以集群的形式在运行,在运行的过程中包含了两类进程。其中 TaskManager 实际负责执行计算的 Worker,在其上执行 Flink Job 的一组 Task,Task 则是我们执行具体代码逻辑的容器。理论上只要我们的任务 Task 足够多就可以对足够大的数据量进行处理。
但是实际上大数据量经常出现,一个 Flink 作业包含 200 个 Task 节点,其中有 199 个节点可以在很短的时间内完成计算。但是有一个节点执行时间远超其他结果,并且随着数据量的持续增加,导致该计算节点挂掉,从而整个任务失败重启。我们可以在 Flink 的管理界面中看到任务的某一个 Task 数据量远超其他节点。
2、Hadoop框架的特性
  • 不怕数据大,怕数据倾斜
  • jobs数比较多的作业运行效率相对比较低,如子查询比较多。
  • sum,count,max,min等聚集函数,不会有数据倾斜问题
3、容易数据倾斜情况
  • group by
  • count(distinct ),在数据量大的情况下,容易数据倾斜,因为count(distinct)是按group by 字段分组,按distinct字段排序。
  • 小表关联超大表
数据倾斜的时候进行负载均衡
Hive.groupby.skewindata=true
当选项设定为 true,生成的查询计划会有两个MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。
二、优化常用的手段
Flink 任务出现数据倾斜的直观表现是任务节点频繁出现反压,但是增加并行度后并不能解决问题;部分节点出现 OOM 异常,是因为大量的数据集中在某个节点上,导致该节点内存被爆,任务失败重启。
产生数据倾斜的原因主要有 2 个方面:
  • 业务上有严重的数据热点,比如滴滴打车的订单数据中北京、上海等几个城市的订单量远远超过其他地区;
  • 技术上大量使用了 KeyBy、GroupBy 等操作,错误的使用了分组 Key,人为产生数据热点。
因此解决问题的思路也很清晰:
  • 业务上要尽量避免热点 key 的设计,例如我们可以把北京、上海等热点城市分成不同的区域,并进行单独处理;
  • 技术上出现热点时,要调整方案打散原来的 key,避免直接聚合;此外 Flink 还提供了大量的功能可以避免数据倾斜。
解决数据倾斜问题
  • 减少job数(合并MapReduce,用Multi-group by)
  • 设置合理的mapreduce的task数,能有效提升性能。
  • 数据量较大的情况下,慎用count(distinct)。
  • 对小文件进行合并,针对文件数据源。
三、优化案例
  • join原则
将条目少的表/子查询放在 Join的左边。 原因是在 Join 操作的 Reduce 阶段,位于 Join左边的表的内容会被加载进内存,将条目少的表放在左边,可以有效减少发生内存溢出的几率。
当一个小表关联一个超大表时,容易发生数据倾斜,可以用MapJoin把小表全部加载到内存在map端进行join,避免reducer处理。
         如:SELECT /*+ MAPJOIN(user)*/  l.session_id, u.username from user u join page_views lon (u. id=l.user_id) ;
  • 笛卡尔积
         当Hive设定为严格模式(hive.mapred.mode=strict)时,不允许在HQL语句中出现笛卡尔积。
         当无法躲避笛卡尔积时,采用MapJoin,会在Map端完成Join操作,将Join操作的一个或多个表完全读入内存。
         MapJoin的用法是在查询/子查询的SELECT关键字后面添加/*+MAPJOIN(tablelist) */提示优化器转化为MapJoin 。
        其中tablelist可以是一个表,或以逗号连接的表的列表。tablelist中的表将会读入内存,应该将小表写在这里
  • 控制Map数
  同时可执行的map数是有限的。
    通常情况下,作业会通过input的目录产生一个或者多个map任务
    主要的决定因素有: input的文件总个数,input的文件大小。
举例
   a) 假设input目录下有1个文件a,大小为780M,那么hadoop会将该文件a分隔成7个块(block为128M,6个128m的块和1个12m的块),从而产生7个map数
   b) 假设input目录下有3个文件a,b,c,大小分别为10m,20m,130m,那么hadoop会分隔成4个块(10m,20m,128m,2m),从而产生4个map数
两种方式控制Map数:即减少map数和增加map数
减少map数可以通过合并小文件来实现,这点是对文件数据源来讲。
增加map数的可以通过控制上一个job的reduer数来控制

Flink 消费 Kafka 上下游并行度不一致导致的数据倾斜
通常我们在使用 Flink 处理实时业务时,上游一般都是消息系统,Kafka 是使用最广泛的大数据消息系统。当使用 Flink 消费 Kafka 数据时,也会出现数据倾斜。
需要十分注意的是,我们 Flink 消费 Kafka 的数据时,是推荐上下游并行度保持一致,即 Kafka 的分区数等于 Flink Consumer 的并行度。
但是会有一种情况,为了加快数据的处理速度,来设置 Flink 消费者的并行度大于 Kafka 的分区数。如果你不做任何的设置则会导致部分 Flink Consumer 线程永远消费不到数据。
这时候你需要设置 Flink 的 Redistributing,也就是数据重分配。
GroupBy + Aggregation 分组聚合热点问题
业务上通过 GroupBy 进行分组,然后紧跟一个 SUM、COUNT 等聚合操作是非常常见的。我们都知道 GroupBy 函数会根据 Key 进行分组,完全依赖 Key 的设计,如果 Key 出现热点,那么会导致巨大的 shuffle,相同 key 的数据会被发往同一个处理节点;如果某个 key 的数据量过大则会直接导致该节点成为计算瓶颈,引起反压。
两阶段聚合解决 KeyBy 热点
KeyBy 是我们经常使用的分组聚合函数之一。在实际的业务中经常会碰到这样的场景:双十一按照下单用户所在的省聚合求订单量最高的前 10 个省,或者按照用户的手机类型聚合求访问量最高的设备类型等。
其他
Flink 一直在不断地迭代,不断出现各种各样的手段解决我们遇到的数据倾斜问题。例如,MiniBatch 微批处理手段等

楼主热帖
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享 分享淘帖 赞 踩

168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

GMT+8, 2024-4-26 08:20

Powered by BI168大数据社区

© 2012-2014 168大数据

快速回复 返回顶部 返回列表