最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

阿里云大数据项目实战开发

[复制链接]
跳转到指定楼层
楼主
发表于 2019-3-21 17:32:28 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
本帖最后由 168主编 于 2019-3-21 17:34 编辑

任务目标:从阿里云数据库中读取表1:hdp6_result 和表2:hdp6_locationresult
对取出的数据进行简单处理后,在数据库中新建表3:hdp5_resultsave
并将数据以分区表的形式存入表3当中。

1.数据库读取流程步骤1:链接Spark master
[AppleScript] 纯文本查看 复制代码
SparkConf conf = new SparkConf()
                    .setAppName("ResultData");
                    .setMaster("local");
JavaSparkContext ctx = new JavaSparkContext(conf);  



步骤2:初始化OdpsContext,并进行数据库的select
[AppleScript] 纯文本查看 复制代码
OdpsContext sqlContext = new OdpsContext(ctx);
String sql = "select * from hdp6_result limit 100";
DataFrame df = sqlContext.sql(sql);
df.show(100);
df.printSchema();

2.多表查询,返回一个dataframe方法一:SQL语句多表查询,返回一个dataframe
[AppleScript] 纯文本查看 复制代码
String sql = "select result_value,lower_tolerance,upper_tolerance,"
                    + "time_stamp,type_number from hdp6_result "
                    + "INNER join hdp6_locationresult ON "
                    + "hdp6_result.location_result_uid = hdp6_locationresult.location_result_uid";
            DataFrame df = sqlContext.sql(sql);
            df.show();

重要事项!!!:经过验证,SQL语句多表查询的方式,并不适用于阿里云ODPS,可能是由于多表查询不利于官方进行计费。
方法二:每张表单独返回一个dataframe,将dataframe进行组合,返回一个新的dataframe
[AppleScript] 纯文本查看 复制代码
String sql1 = "select * from hdp6_result limit 100";
DataFrame df1 = sqlContext.sql(sql1);
df1.show(100);
df1.printSchema();

String sql2 = "select * from hdp6_locationresult limit 100";
DataFrame df2 = sqlContext.sql(sql2);
df2.show(100);
df2.printSchema();

DataFrame df0 = df2.join(df1, df1.col("location_result_uid").equalTo(df2.col("location_result_uid")));;
System.out.println("----3----");
df0.show(200);
df0.printSchema();

3.对数据进行简单处理并生成新的dataframe     
[AppleScript] 纯文本查看 复制代码
      //---------------------------------------表格location_result列数据以getString形式取出
        List<String> resultlist = df1.select("location_result").distinct().javaRDD().map(new Function<Row, String>() {
            public String call(Row row) {
                return String.valueOf(row.getString(0));
            }
        }).distinct().collect();
        //---------------------------------------开始循环处理数据
        for (String locationresult : resultlist) {
            df2 = df1.filter("location_result = '" + locationresult + "' ");//特定location的内容
            df2.cache();//df2持久化
            DataFrame valueByType0 = df2.groupBy("tolerance", "type_number")
                    .agg(avg("result_value"),count("result_value"),stddev("result_value"))
                    .filter(("count(result_value) >= " + PART_COUNT_LOW_LIMIT))
                    .orderBy("tolerance", "type_number");//拆出公差,型号,平均和数量,方差
            List<Row> valueByType = valueByType0.javaRDD().map(new Function<Row, Row>() {
                public Row call(Row row) {
                    row.getString(0);//tolerance
                    row.getString(1);//type
                    row.getDouble(2);//avg
                    row.getLong(3);//count
                    row.getDouble(4);//std
                    return row;
                }
            }).collect();
            //-----------------------------------数组,String tolerance 0,String type 1,Double avg 2,Long count 3,Double std 4
            for (Row row : valueByType) {
                if (processed.contains(row.getString(0))) {
                    msg = "processed tolerance" + row.getString(0);
                    System.out.println(msg);
                } else if ((valueByType0.filter("tolerance = " + row.getString(0))).count() == 1) {
                    msg = "only one line" + row.getString(0);
                    System.out.println(msg);
                } else {
                    msg = "---------DATA Hangding--------";
                    System.out.println(msg);
                    processed.add(row.getString(0));//将该公差加入已处理列表
                    List<Row> valueByTypeOneTol = valueByType0.filter("tolerance = '" + row.getString(0)+"'").collectAsList();
                    double sum = 0;
                    long count = 0;
                    double stdsum = 0;
                    for (Row row1 : valueByTypeOneTol) //计算总平均
                    {
                        sum += row1.getDouble(2) * row1.getLong(3);
                        count += row1.getLong(3);
                        stdsum += Math.pow(row1.getDouble(4), 2);
                    }
                    msg = "---------Print result of avg&std--------";
                    System.out.println(msg);
                    double avg = sum / count;
                    double std = Math.pow((stdsum / valueByTypeOneTol.size()), 0.5);
                    System.out.println("avg = " + avg + ", std = " + std);

                    msg = "---------Result Judgement--------";
                    System.out.println(msg);
                    for (Row row2 : valueByTypeOneTol) {
                        System.out.println("delta avg  = " + (avg - row2.getDouble(2)));
                        String tol[] = row2.getString(0).split("/");
                        double tolSpan = Math.abs(Double.valueOf(tol[1]) - Double.valueOf(tol[0]));
                        System.out.println("tolSpan = " + tolSpan);
                        if (tolSpan <= 0) //判断公差是否为0
                        {
                            msg = "no tolerance or tolerance equals to 0";
                            System.out.println(msg);
                        } else {
                            if ((avg - row2.getDouble(2)) >= tolSpan * AVG_TOL)//均值判断 平均值 - 单个值 大为坏
                            {
                                msg = "typeno = " + row2.getString(1) + "avg need to be handled";
                                System.out.println(msg);
                                String typeno = row2.getString(1);
                                errorMessage = "type = " + typeno + "avg is less than the mean, please check if there is any problems in this station\n";
                            } else {
                                msg = "typeno = " + row.getString(1) + "do not need to be handled";
                                System.out.println(msg);
                            }
                            System.out.println("delta std  = " + (row.getDouble(4) - std));
                            if ((row.getDouble(4) - std) >= tolSpan * STD_TOL)//方差判断 单个值 - 平均值  大为坏
                            {
                                msg = "typeno = " + row.getString(1) + "std need to be handled";
                                System.out.println(msg);
                                String typeno = row2.getString(1);
                                errorMessage = "type = " + typeno + "std is greater than the mean, please check if there is any problems in this station\n";
                            } else {
                                msg = "typeno = " + row.getString(1) + "do not need to be handled";
                                System.out.println(msg);
                            }
                        }
                    }
                }
            }

4.新建表格用于存储处理后的数据
[AppleScript] 纯文本查看 复制代码
//----------------------------------------------新建一个表格
sqlContext.sql("Create table if not exists scxtest001 (uid varchar(500), time_stamp String) partitioned by (pt String)");

5.存储处理后的dataframe方法一:以临时表的形式存入新建表格注意,目前阿里云只有spark 2.3.0版本才支持建立临时视图
方法二:INSERTINTO和SAVEASTABLE的形式存入新建表格关于上述两个方法,请关注我个人的git lab 项目:
https://gitlab.com/uaes-tef3/hdev6online


作者:旺达哒哒哒
链接:https://www.jianshu.com/p/f69cca15b521
来源:简书



楼主热帖
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享 分享淘帖 赞 踩

168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

GMT+8, 2024-4-20 02:12

Powered by BI168大数据社区

© 2012-2014 168大数据

快速回复 返回顶部 返回列表