最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

Spark如何自定义分区实现多文件输出【完整解决方案】

[复制链接]
跳转到指定楼层
楼主
发表于 2015-9-29 21:58:09 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
本帖最后由 leo201106 于 2015-10-9 15:27 编辑

需求:
[(a1,b1,c1),(a1,b2,c2),(a2,b,c),(a3,b,c)]使用Spark分割成a1,a2,a3三个文件
a1:
a1,b1,c1
a1,b2,c2

a2:
a2,b,c

a3
a3,b,c


从iteblog中搜到使用partitionBy结合saveAshadoopFile可达到此目的,链接如下:
http://www.iteblog.com/archives/1281
但我在使用的时候发现partitionBy是不列在api上的,同时使用eclipse编写scala程序写partitionBy是无法编译通过的,报以下错误:
value partitionBy is not a member of org.apache.spark.rdd.RDD[(Int, Int)]
不知道有没同学使用过这个方法,怎么解决的

注:在spark-shell里,使用map之后是可以使用的,但在eclipse里就是编译不过
----------------------------------------------------------------------------------------------------------------
唉,等了好多天,居然没有一个回应啊,求人不如求已呢,以下是完整解决方案,同时感谢OSChina的"以诚相代"同学的帮助

首先说明一下,上面报错的原因
RDD确实是没有partitionBy方法的,JavaPairRDD有此方法,而Scala中对应的是PairRDDFunctions,两种方法解决:
1,我们可以new一个PairRDDFunctions进行转换,
2,同时spark scala也提供了一个自动转换的方式,我们导入org.apache.spark.SparkContext.rddToPairRDDFunctions
而我在使用“过往记忆” w397090770的代码时import部分,使用的是Eclipse单个类的自动导入,代码里显式的使用rddToPairRDDFunctions,导致未引入此类从而导致报错。

使用 w397090770的可以基本完成上面的需求,但是实际输出时文件格式与我们原始的无法保持一致,用同样的分隔符,不增加多余的列(saveHadoopFile会将分区的键值作为第一列数据)


我们仍然使用saveHadoopFile进行文件名的自定义,但不对每行进行分割,而时生成文件名时对键进行分割并返回指定的列名,文件输出时我们只输出key,而不输出value(将value置成空字符串),以下是完整代码:

[Scala] 纯文本查看 复制代码
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.spark.HashPartitioner
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

import org.apache.spark.SparkContext.rddToPairRDDFunctions;//这一行没有显式的调用,但一定要引入,不然会报错

/**
 * Leo Dj @2015-10-09
 */
object MultipleTextOutput {
  def main(args: Array[String]) {
    val filePath = "/user/input/operation.log";
    val savePath = "/user/output/merge";
    
    val conf = new SparkConf().setAppName("SplitTest")
    val   sc = new SparkContext(conf)
    
    case class RDDMultipleTextOutputFormatter() extends MultipleTextOutputFormat[Any, Any] {
            //自定义保存文件名
      override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =  {
        val separator = ",";
        //取72列的值,作为文件名
        key.asInstanceOf[String].split(separator)(72);
      }
    }
    
    //读取文件后,不进行split操作,直接将整行内容看作key,
    sc.textFile(filePath).map(x=>(x,"")).
      partitionBy(new HashPartitioner(3)).saveAsHadoopFile(savePath, classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormatter])
  
  }
}


最后可以达到如下目标:
按照指定列进行文件分割
文件名会分区的键值
文件内容不增加多余的列,同时分隔符可自由指定






楼主热帖
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享 分享淘帖 赞 踩

168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

GMT+8, 2024-5-4 04:06

Powered by BI168大数据社区

© 2012-2014 168大数据

快速回复 返回顶部 返回列表