最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

[实践案例] 【数据工场dataworks-1】数据开发之数据质量的核查

[复制链接]
跳转到指定楼层
楼主
发表于 2019-9-10 17:25:10 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
【学习背景】


近期陆续有客户咨询公有云大数据的方案,对比了阿里云、腾讯云和华为云的产品,感觉阿里云的方案丰富性和资料的易用性方面都是做得最好的,而且有淘宝双十一的成功经验,决定好好学习一下阿里云大数据的方案。今天先从阿里云的课程《使用MaxCompute进行数据质量核查》开始。





【学习思路】


在阿里云上购买课程,参照实验课程进行操作。先模仿,再优化。





【实验背景】


ABC是一家销售公司,其客户可以通过网站下单订购该公司经营范围内的商品,并使用信用卡、银行卡、转账等方式付费。付费成功后,ABC公司会根据客户地址依据就近原则选择自己的货仓,指派合适的快递人员配送商品。


经过几年的经营,该公司积累了一批经营数据,他们依托阿里云大数据计算服务、大数据开发套件等搭建了一个企业级的数据平台,将历史数据以及每天产生的数据都同步到该平台上去。由于种种遗留问题,造成这些数据中存在一定的数据质量问题,为了能得到更准确的数据分析结果,使用MaxCompute技术去发现数据中的质量问题。





【实验目标】


根据具体的场景需求,实现如下的数据监控场景,并最终输出数据质量监控报告。


1、错误值:某些时间格式存在问题,导致数据库人员将部分时间字段设置成了字符串型。


监控场景:订单表内订单时间格式出错。


2、重复值:订单系统中部分记录关键信息重复(同样的人在同样的时间下了不同的订单),导致客户投诉。


监控场景:订单表同一客户同一时间下了多次订单。


3、数据不一致:地市信息名称未标准化,导致在数据分析时,未能把相同地域的数值汇总在一起。


监控场景:客户信息表省份信息异常。


4、数据完整性:配送的订单在订单表中不存在,导致物流人员空跑,效率下降。


监控场景:配送的订单在订单表中不存在。


5、缺失值:部分客户性别信息缺失,影响后续使用。


监控场景:客户信息表性别信息缺失。


6、异常值:单月购买次数异常(当月购买次数大于10次)


监控场景:同客户单月购买次数异常(当月购买次数大于10次)





【解决方案】


在这个实验课程中,核心步骤有3个。第1个是实验数据准备,第2个是配置数据质量的核查规则,第3个是通过maxcompute程序实现数据核查。前2个步骤是本课程已经提供的资源,对于学员来说,通过第3步的动手操作才能真正理解maxcompute的工作原理。





【操作步骤】


1、在阿里云官网购买课程《使用MaxCompute进行数据质量核查》,前6个课时是理论介绍,第7个课时是实验环节。本文重点总结实验环节的步骤。


课程链接:https://edu.aliyun.com/course/43 ... .0.0.66e670d7smFkQM


2、先通读一遍实验课程里面的内容,大致了解了数据质量核查的基本流程。





3、分析测试数据集中各张表之间的逻辑关系。





4、在阿里云控制台中,找到maxcompute并开通,创建一个dataworks工作空间,然后进入数据开发。本文测试用户按需购买即可。





5、新建一个临时查询(实际上是执行SQL的窗口),依次创建前面的6个数据表。





对应的SQL语句如下:


----订单表
drop table if exists ODS_EBUSI_ORDERS;
create table ODS_EBUSI_ORDERS(
        ORDER_ID BIGINT COMMENT '订单号',
        CUSTOMER_ID BIGINT COMMENT '客户编码',
        PRODUCT_ID BIGINT COMMENT '产品编码',
        PRODUCT_CNT BIGINT COMMENT '产品数量',
        ORDER_AMT DOUBLE COMMENT '产品费用',
        SHIPPING_TYPE_ID BIGINT COMMENT '运输类型',
        ORDER_TIME STRING COMMENT '订单时间'
)COMMENT '订单表';

----客户表
drop table if exists ODS_EBUSI_CUSTOMERS;
create table ODS_EBUSI_CUSTOMERS(
        CUSTOMER_ID BIGINT COMMENT '客户编码',
        CUSTOMER_NAME        STRING COMMENT '客户名称',
        AGE BIGINT COMMENT '年龄',
        GENDER BIGINT COMMENT '性别编码',
        CITY_ID BIGINT COMMENT '城市编码',
        CITY_NAME STRING COMMENT '城市名称',
        PROVINCE_ID BIGINT COMMENT '省份编码',
        PROVINCE_NAME STRING COMMENT '省份名称',
        GEN_DATE STRING COMMENT '出生日期'
)COMMENT '客户表';

----配送表
drop table if exists ODS_EBUSI_DISPATCH;
create table ODS_EBUSI_DISPATCH(
        DISPATCH_ID BIGINT COMMENT '配送单号',
        ORDER_ID BIGINT COMMENT '订单号',
        EXPRESS_STAFF_ID BIGINT COMMENT '配送员编码',
        STOREHOUSE_ID BIGINT COMMENT '仓库编码',
        EXPECT_TIME DATETIME COMMENT '期望到货时间',
        DISPATCH_TIME DATETIME COMMENT '开始送货时间'
)COMMENT '配送表';

----产品表
drop table if exists ODS_EBUSI_DIM_PRODUCT;
create table ODS_EBUSI_DIM_PRODUCT(
        PRODUCT_ID BIGINT COMMENT '产品编码',
        PRODUCT_NAME        STRING COMMENT '产品名称',
        PRICE DOUBLE COMMENT '产品价格',
        PRODUCT_CATEGORY_ID BIGINT COMMENT '产品类别'
)COMMENT '产品表';

----省份表
drop table if exists ODS_EBUSI_DIM_PROVINCE;
create table ODS_EBUSI_DIM_PROVINCE(
        PROVINCE_ID BIGINT COMMENT '省份编码',
        PROVINCE_NAME        STRING COMMENT '省份名称'
)COMMENT '省份表';

----城市表
drop table if exists ODS_EBUSI_DIM_CITY;
create table ODS_EBUSI_DIM_CITY(
        CITY_ID BIGINT COMMENT '城市编码',
        CITY_NAME        STRING COMMENT '城市名称',
        LNG DOUBLE  COMMENT '经度',
        LAT DOUBLE  COMMENT '维度',
        PROVINCE_ID BIGINT COMMENT '省份编码'
)COMMENT '城市表';
6、将课程中5个表的数据集依次导入到对应的数据表中





注意导入前检查分隔符的类型,不一定是逗号,根据实际情况选择。





注意选择数据表时,可能会看不到前面已经创建的表,需要在公共表的组件中先刷新,确保已经看到5个表已经创建成功并能够显示出来。





7、创建一个表,用于记录检查数据质量的规则,不可直接对外访问。每一行都是一个条规则,用于检查某个表的数据项是否正常。





参考上面的方法,创建规则表然后导入数据。规则数据集来自教程的附件。


drop table if exists ODS_DATA_CHECK_RULE;
create table ODS_DATA_CHECK_RULE(
     CHECK_RULE_ID STRING COMMENT '核查规则编码',
     CHECK_RULE_NAME STRING COMMENT '核查规则名称',
     CHECK_RULE_DESC STRING COMMENT '核查规则内容描述',
     CHECK_RULE_TYPE_ID STRING COMMENT '核查规则类型编码',
     CHECK_RULE_TYPE_NAME STRING COMMENT '核查规则类型名称',
     CHECK_RULE_STATUS STRING COMMENT '状态',
     CHECK_RULE_OWNER STRING COMMENT '负责人',
     CHECK_RULE_EMAIL STRING COMMENT '负责人邮箱',
     CHECK_RULE_PHONE STRING COMMENT '负责人手机',
     CHECK_RULE_TABLE STRING COMMENT '数据表',
     CHECK_RULE_SQL STRING COMMENT '监控脚本'
)COMMENT '数据质量监控规则表';






8、创建一个表,用于存放所有的检查结果表。根据该表记录的数据,就能快速看出是哪个表的哪些数据处理问题,适合做数据可视化来支撑运营工作。


drop table if exists DWD_DATA_CHECK_REPORT;
create table DWD_DATA_CHECK_REPORT(
     CHECK_RULE_ID STRING COMMENT '核查规则编码',
     CHECK_RULE_NAME STRING COMMENT '核查规则名称',
     CHECK_RULE_TYPE_NAME STRING COMMENT '核查规则类型名称',
     CHECK_RULE_OWNER STRING COMMENT '负责人',
     CHECK_RULE_TABLE STRING COMMENT '数据表',
     WARNING_TIME STRING COMMENT '告警时间',
     WARNING_CONTENT STRING COMMENT '告警内容'
)COMMENT '数据质量监控报告表'
PARTITIONED BY (
     DT STRING COMMENT '时间分区'
);
9、新建一个数据质量监控的流程。





从左侧组件栏中拖拽一个“虚拟节点”到面版中,作为流程的开始。





10、新建一个ODPS SQL节点,用于创建一个按天为单位的检查结果存放表,并将虚拟节点作为上游节点,用线连接起来。双击编辑SQL语句,保存。


----新建临时表1用于存放数据质量是否产生告警
drop table if exists TMP_DWD_DATA_CHECK_REPORT_001_${vDay};
create table TMP_DWD_DATA_CHECK_REPORT_001_${vDay}(
     CHECK_RULE_ID STRING COMMENT '核查规则编码',
     WARNING_STATUS STRING COMMENT '是否告警'
)COMMENT '数据质量监控规则告警状态';
11、新建一个ODPS SQL节点,用于检查数据十分符合规则1,如果不符合就把检查结果插入到今日的结果表中。


----插入临时表1监控规则编码check_001是否告警
insert into table TMP_DWD_DATA_CHECK_REPORT_001_${vDay}
select 'check_001',
     case when A.WARNING_CNT>0 then '1' else '0' end as WARNING_STATUS
from(select COUNT(1) WARNING_CNT
         from ODS_EBUSI_ORDERS
         where not(instr(ORDER_TIME,'-',1,1) = 5 and instr(ORDER_TIME,'-',1,2) = 8 and instr(ORDER_TIME,'-',1,3) = 0)
              or not(instr(ORDER_TIME,':',1,1) = 14 and instr(ORDER_TIME,':',1,2) = 17 and instr(ORDER_TIME,'-',1,3) = 0))as A
;
12、同理依次创建检查规则2到规则6的ODPS SQL节点。


----插入临时表1监控规则编码check_002是否告警
insert into table TMP_DWD_DATA_CHECK_REPORT_001_${vDay}
select 'check_002',
        case when B.WARNING_CNT>1 then '1' else '0' end as WARNING_STATUS
from(select SUM(WARNING_CNT) as WARNING_CNT
                from(SELECT CUSTOMER_ID,
                                                ORDER_TIME,
                                                COUNT(DISTINCT ORDER_ID) WARNING_CNT
                                        FROM ODS_EBUSI_ORDERS
                                        GROUP BY CUSTOMER_ID,
                                                ORDER_TIME
                                        HAVING COUNT(DISTINCT ORDER_ID) > 1)as A)AS B
;
----插入临时表1监控规则编码check_003是否告警
insert into table TMP_DWD_DATA_CHECK_REPORT_001_${vDay}
select 'check_003',
        case when A.WARNING_CNT>1 then '1' else '0' end as WARNING_STATUS
from(SELECT count(1) as WARNING_CNT
                FROM ODS_EBUSI_CUSTOMERS A
                        INNER JOIN ODS_EBUSI_DIM_PROVINCE B ON A.PROVINCE_ID = B.PROVINCE_ID
                WHERE A.PROVINCE_NAME <> B.PROVINCE_NAME)as A
;
----插入临时表1监控规则编码check_004是否告警
insert into table TMP_DWD_DATA_CHECK_REPORT_001_${vDay}
select 'check_004',
        case when A.WARNING_CNT>1 then '1' else '0' end as WARNING_STATUS
from(SELECT count(1) as WARNING_CNT
                FROM ODS_EBUSI_DISPATCH A
                        LEFT OUTER JOIN ODS_EBUSI_ORDERS B ON A.ORDER_ID=B.ORDER_ID
                WHERE B.ORDER_ID IS NULL)as A
;
----插入临时表1监控规则编码check_005是否告警
insert into table TMP_DWD_DATA_CHECK_REPORT_001_${vDay}
select 'check_005',
        case when A.WARNING_CNT>1 then '1' else '0' end as WARNING_STATUS
from(SELECT count(1) as WARNING_CNT
                FROM ODS_EBUSI_CUSTOMERS A
                WHERE A.GENDER IS NULL)as A
;
----插入临时表1监控规则编码check_006是否告警
insert into table TMP_DWD_DATA_CHECK_REPORT_001_${vDay}
select 'check_006',
        case when A.WARNING_CNT>1 then '1' else '0' end as WARNING_STATUS
from(SELECT SUBSTR(ORDER_TIME,1,7),
                        CUSTOMER_ID,
                        COUNT(1) as WARNING_CNT
                FROM ODS_EBUSI_ORDERS A
                GROUP BY SUBSTR(ORDER_TIME,1,7),
                        CUSTOMER_ID
                HAVING COUNT(1) > 10)as A
;
13、将当天的检查结果,合并到总表中,同样需要创建一个ODPS节点。


----插入目标表监控报告信息
insert overwrite table DWD_DATA_CHECK_REPORT PARTITION(DT)
select A.CHECK_RULE_ID,        ----'核查规则编码',
        A.CHECK_RULE_NAME,        ----'核查规则名称',
        A.CHECK_RULE_TYPE_NAME,        ----'核查规则类型名称',
        A.CHECK_RULE_OWNER,        ----'负责人',
        A.CHECK_RULE_TABLE,        ----'数据表',
        concat('${datetime}',' ','${hour}') as WARNING_TIME,        ----'告警时间',
        concat('监控规则:',A.CHECK_RULE_NAME,',告警内容:',A.CHECK_RULE_DESC,',请登录系统进行核查!') as WARNING_CONTENT,        ----'告警内容'
        '${vDay}' as DT        ----'时间分区'
from ODS_DATA_CHECK_RULE A
        inner join TMP_DWD_DATA_CHECK_REPORT_001_${vDay} B on A.CHECK_RULE_ID=B.CHECK_RULE_ID and B.WARNING_STATUS = '1'
;
14、最后再配置一个流程结算的虚拟节点,完整的检查流程如下:





15、依次展开每个ODPS的SQL节点,检查SQL代码如果涉及到参数,在右侧“属性”中根据需要,输入如下参数。 注意等号左右没有空格,相邻2个参数 以空格隔开。


vDay=$[yyyymmdd-1]     


datetime=$[yyyy-mm-dd]   


hour=$[hh24:mi:ss]


16、在“业务流程”-“data_check_数据质量监控”-“数据库开发”的属性页中,点击“运行”。


不巧的是,第一次运行就出现了问题,在第一个SQL语句的执行节点“创建结果表”失败了。


右键失败的节点,查看日志,发现出现了语法错误,错误地方正好是输入的变量vDay,因此推测是没有给这个节点设置参数说明。





解决办法:双击失败的节点,在弹出的SQL语句编辑界面,右侧的属性中设置所需的变量说明,然后保存和提交。


用同样的方法检查所有的ODPS SQL节点的参数。





重新执行,所有节点都运行通过了。


17、至此,阿里云网站上的课程就结束了,但是我的疑问来了,怎么看核查结果呢。尝试将存放结果的表查询出来“select * from dwd_data_check_report;”显示出来,结果报了一个错误。


FAILED: ODPS-0130071:[0,0] Semantic analysis exception - physical plan generation failed: java.lang.RuntimeException: Table(lyc816_datacheck,dwd_data_check_report) is full scan with all partitions, please specify partition predicates.


该错误提示查询语句对整个表进行了扫描,被限制了,原因是该表已经创建了分区,查询时必须指定分区条件,因此无法整表查询。


关于分区的详情参考:https://help.aliyun.com/document_detail/27820.html


18、分析结果表“dwd_data_check_report”的创建语句,其实已经设置了按日期进行分区。那就根据分区进行查询。


drop table if exists DWD_DATA_CHECK_REPORT;
create table DWD_DATA_CHECK_REPORT(
     CHECK_RULE_ID STRING COMMENT '核查规则编码',
     CHECK_RULE_NAME STRING COMMENT '核查规则名称',
     CHECK_RULE_TYPE_NAME STRING COMMENT '核查规则类型名称',
     CHECK_RULE_OWNER STRING COMMENT '负责人',
     CHECK_RULE_TABLE STRING COMMENT '数据表',
     WARNING_TIME STRING COMMENT '告警时间',
     WARNING_CONTENT STRING COMMENT '告警内容'
)COMMENT '数据质量监控报告表'
PARTITIONED BY (
     DT STRING COMMENT '时间分区'
);
既然查询语句增加分区条件和显示数量控制。选中结果表,点击分区信息,确定分区查询用哪个值。





因此基于分区的查询语句如下:


select * from dwd_data_check_report where dt='20190326' limit 100;




至此,阿里云的实验课程结束。





【心得】


1、dataworks工作台做到了流程可视化。


2、虚拟节点(start)和odps_sql节点(insert_data),并配置insert_data依赖start节点。一般虚拟节点属于控制类型节点,在业务流程运行过程中不对数据产生任何影响,仅用于实现对下游节点的运维控制。虚拟节点一般会设置成工作空间根节点,格式为:工作空间名_root。


3、这门课程最后没有告诉学员怎么观察质量核查的结果,因此课程是不完整的。因此我通过结合DataV来实现核查结果的可视化。


4、购买了阿里云大数据服务maxcompute后,阿里云会发送一条短信邀请加入maxcompute开发者社区的钉钉群,遇到技术问题,这里有专门答疑机器人和技术专家答疑。同时阿里云大数据开发专业也有对应的班级群,有班主任和班级助理可以支持答疑,但是响应速度没有开发者社区那么快。


5、如果一个表中的数据很大,全表查询时,会报错误“full scan with all partitions, please specify partition predicates”。解决办法时按照条件(比如时间、地区等)进行分区,然后查询语句指定分区条件和结果条数。详情参考:


https://help.aliyun.com/document_detail/27820.html  若链接失效,则在maxcompute中搜索关键字“分区”。分区的原理是将逻辑归属同一个表的数据,按照不同字段分别存放在不同的物理空间,达到加速查询的效果。




【遗留的课后作业】


1、手工向订单表插入一条错误的数据,执行分析流程,确认结果表是否多了一条记录。目的是掌握dataworks的SQL语句用法和验证这种方法进行实时数据分析的可行性。
————————————————
版权声明:本文为CSDN博主「工匠小能手」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_39295735/article/details/88858225

楼主热帖
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享 分享淘帖 赞 踩

168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

GMT+8, 2024-4-24 08:55

Powered by BI168大数据社区

© 2012-2014 168大数据

快速回复 返回顶部 返回列表