168大数据

标题: Spark如何自定义分区实现多文件输出【完整解决方案】 [打印本页]

作者: leo201106    时间: 2015-9-29 21:58
标题: Spark如何自定义分区实现多文件输出【完整解决方案】
本帖最后由 leo201106 于 2015-10-9 15:27 编辑

需求:
[(a1,b1,c1),(a1,b2,c2),(a2,b,c),(a3,b,c)]使用spark分割成a1,a2,a3三个文件
a1:
a1,b1,c1
a1,b2,c2

a2:
a2,b,c

a3
a3,b,c


从iteblog中搜到使用partitionBy结合saveAsHadoopFile可达到此目的,链接如下:
http://www.iteblog.com/archives/1281
但我在使用的时候发现partitionBy是不列在api上的,同时使用eclipse编写scala程序写partitionBy是无法编译通过的,报以下错误:
value partitionBy is not a member of org.apache.spark.rdd.RDD[(Int, Int)]
不知道有没同学使用过这个方法,怎么解决的

注:在spark-shell里,使用map之后是可以使用的,但在eclipse里就是编译不过
----------------------------------------------------------------------------------------------------------------
唉,等了好多天,居然没有一个回应啊,求人不如求已呢,以下是完整解决方案,同时感谢OSChina的"以诚相代"同学的帮助

首先说明一下,上面报错的原因
RDD确实是没有partitionBy方法的,JavaPairRDD有此方法,而Scala中对应的是PairRDDFunctions,两种方法解决:
1,我们可以new一个PairRDDFunctions进行转换,
2,同时spark scala也提供了一个自动转换的方式,我们导入org.apache.spark.SparkContext.rddToPairRDDFunctions
而我在使用“过往记忆” w397090770的代码时import部分,使用的是Eclipse单个类的自动导入,代码里显式的使用rddToPairRDDFunctions,导致未引入此类从而导致报错。

使用 w397090770的可以基本完成上面的需求,但是实际输出时文件格式与我们原始的无法保持一致,用同样的分隔符,不增加多余的列(saveHadoopFile会将分区的键值作为第一列数据)


我们仍然使用saveHadoopFile进行文件名的自定义,但不对每行进行分割,而时生成文件名时对键进行分割并返回指定的列名,文件输出时我们只输出key,而不输出value(将value置成空字符串),以下是完整代码:

[Scala] 纯文本查看 复制代码

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.spark.HashPartitioner
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

import org.apache.spark.SparkContext.rddToPairRDDFunctions;//这一行没有显式的调用,但一定要引入,不然会报错

/**
* Leo Dj @2015-10-09
*/
object MultipleTextOutput {
  def main(args: Array[String]) {
    val filePath = "/user/input/operation.log";
    val savePath = "/user/output/merge";
   
    val conf = new SparkConf().setAppName("SplitTest")
    val   sc = new SparkContext(conf)
   
    case class RDDMultipleTextOutputFormatter() extends MultipleTextOutputFormat[Any, Any] {
            //自定义保存文件名
      override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =  {
        val separator = ",";
        //取72列的值,作为文件名
        key.asInstanceOf[String].split(separator)(72);
      }
    }
   
    //读取文件后,不进行split操作,直接将整行内容看作key,
    sc.textFile(filePath).map(x=>(x,"")).
      partitionBy(new HashPartitioner(3)).saveAsHadoopFile(savePath, classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormatter])
  
  }
}


最后可以达到如下目标:
按照指定列进行文件分割
文件名会分区的键值
文件内容不增加多余的列,同时分隔符可自由指定











欢迎光临 168大数据 (http://www.bi168.cn/) Powered by Discuz! X3.2