最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

[基础] Hadoop作业提交分析系列(四)

[复制链接]
跳转到指定楼层
楼主
发表于 2014-7-12 23:37:03 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x

前面我们所分析的部分其实只是hadoop作业提交的前奏曲,真正的作业提交代码是在MR程序的main里,RunJar在最后会动态调用这个main,在(二)里有说明。我们下面要做的就是要比RunJar更进一步,让作业提交能在编码时就可实现,就像Hadoop Eclipse Plugin那样可以对包含Mapper和Reducer的MR类直接Run on Hadoop。

  一般来说,每个MR程序都会有这么一段类似的作业提交代码,这里拿WordCount的举例:

[url=][/url]
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
[url=][/url]

  首先要做的是构建一个Configuration对象,并进行参数解析。接着构建提交作业用的Job对象,并设置作业Jar包、对应Mapper和Reducer类、输入输出的Key和Value的类及作业的输入和输出路径,最后就是提交作业并等待作业结束。这些只是比较基本的设置参数,实际还支持更多的设置参数,这里就不一一介绍,详细的可参考API文档。

  一般分析代码都从开始一步步分析,但我们的重点是分析提交过程中发生的事,这里我们先不理前面的设置对后面作业的影响,我们直接跳到作业提交那一步进行分析,当碰到问题需要分析前面的代码时我会再分析。

  当调用job.waitForCompletion时,其内部调用的是submit方法来提交,如果传入参数为ture则及时打印作业运作信息,否则只是等待作业结束。submit方法进去后,还有一层,里面用到了job对象内部的jobClient对象的submitJobInternal来提交作业,从这个方法才开始做正事。进去第一件事就是获取jobId,用到了jobSubmitClient对象,jobSubmitClient对应的类是JobSubmissionProtocol的实现之一(目前有两个实现,JobTracker和LocalJobRunner),由此可判断出jobSubmitClient对应的类要么是JobTracker,要么是LocalJobRunner。呃,这下有点想法了,作业提交是上到JobTracker去,还是在本地执行?可能就是看这个jobSunmitClient初始化时得到的是哪个类的实例了,我们可以稍稍的先往后看看,你会发现submitJobInternal最后用了jobSubmitClient.submitJob(jobId)来提交作业,再稍稍看看JobTracker和LocalJobRunner的submitJob实现,看来确实是这么回事。好,那我们就先跳回去看看这个jobSubmitClient是如何初始化的。在JobClient的init中我们可以发现jobSubmitClient的初始化语句:

    String tracker = conf.get("mapred.job.tracker", "local");
    if ("local".equals(tracker)) {
      this.jobSubmitClient = new LocalJobRunner(conf);
    } else {
      this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
    }     

  哈,跟conf中的mapred.job.tracker属性有关,如果你没设置,那默认得到的值就是local,jobSubmitClient也就会被赋予LocalJobRunner的实例。平时,我们开发时一般都只是引用lib里面的库,不引用conf文件夹里的配置文件,这里就能解释为什么我们直接Run as Java Application时,作业被提交到Local去运行了,而不是Hadoop Cluster中。那我们把conf文件夹添加到classpath,就能Run on Hadoop了么?目前下结论尚早,我们继续分析(你添加了conf文件夹后,可以提交试一试,会爆出一个很明显的让你知道还差什么的错误,这里我就卖卖官子,先不说)。

  jobId获取到后,在SystemDir基础上加jobId构建了提交作业的目录submitJobDir,SystemDir由JobClient的getSystemDir方法得出,这个SystemDir在构建fs对象时很重要,确定了返回的fs的类型。下去的configureCommandLineOptions方法主要是把作业依赖的第三方库或文件上传到fs中,并做classpath映射或Symlink,以及一些参数设置,都是些细微活,这里不仔细分析。我们主要关心里面的两个地方,一个是:

FileSystem fs = getFs();

  看上去很简单,一句话,就是获取FileSystem的实例,但其实里面绕来绕去,有点头晕。因为Hadoop对文件系统进行了抽象,所以这里获得fs实例的类型决定了你是在hdfs上操作还是在local fs上操作。好了,我们冲进去看看。

[url=][/url]
public synchronized FileSystem getFs() throws IOException {
    if (this.fs == null) {
      Path sysDir = getSystemDir();
      this.fs = sysDir.getFileSystem(getConf());
    }
    return fs;
  }
[url=][/url]

  看见了吧,fs是由sysDir的getFileSystem返回的。我们再冲,由于篇幅,下面就只列出主要涉及的语句。

[url=][/url]
    FileSystem.get(this.toUri(), conf);
        ↓
    CACHE.get(uri, conf);
        ↓
    fs = createFileSystem(uri, conf);
        ↓
    Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
    if (clazz == null) {
      throw new IOException("No FileSystem for scheme: " + uri.getScheme());
    }
    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
    fs.initialize(uri, conf);
    return fs;
[url=][/url]

  又是跟conf有关,看来conf是得实时跟住的。这里用到了Java的反射技术,用来动态生成相应的类实例。其中的class获取与uri.getScheme有密切关系,而uri就是在刚才的sysDir基础上构成,sysDir的值又最终是由jobSubmitClient的实例决定的。如果jobSubmitClient是JobTracker的实例,那Scheme就是hdfs。如果是LocalJobRunner的实例,那就是file。从core-default.xml你可以找到如下的信息:

[url=][/url]
<property>
  <name>fs.file.impl</name>
  <value>org.apache.hadoop.fs.LocalFileSystem</value>
  <description>The FileSystem for file: uris.</description>
</property>
<property>
  <name>fs.hdfs.impl</name>
  <value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
  <description>The FileSystem for hdfs: uris.</description>
</property>
[url=][/url]

  所以在前面的作业提交代码中,在初始化Job实例时,很多事已经决定了,由conf文件夹中的配置文件决定。Configuration是通过当前线程上下文的类加载器来加载类和资源文件的,所以要想Run on Hadoop,第一步必须要让Conf文件夹进入Configuration的类加载器的搜索路径中,也就是当前线程上下文的类加载器。

  第二个要注意的地方是:

[url=][/url]
    String originalJarPath = job.getJar();
    if (originalJarPath != null) {           // copy jar to JobTracker's fs
      // use jar name if job is not named.
      if ("".equals(job.getJobName())){
        job.setJobName(new Path(originalJarPath).getName());
      }
      job.setJar(submitJarFile.toString());
      fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);
      fs.setReplication(submitJarFile, replication);
      fs.setPermission(submitJarFile, new FsPermission(JOB_FILE_PERMISSION));
    } else {
      LOG.warn("No job jar file set.  User classes may not be found. "+
               "See JobConf(Class) or JobConf#setJar(String).");
    }
[url=][/url]

  因为client在提交作业到Hadoop时需要把作业打包成jar,然后copy到fs的submitJarFile路径中。如果我们想Run on Hadoop,那就必须自己把作业的class文件打个jar包,然后再提交。在Eclipse中,这就比较容易了。这里假设你启用了自动编译功能。我们可以在代码的开始阶段加入一段代码用来打包bin文件夹里的class文件为一个jar包,然后再执行后面的常规操作。

  在configureCommandLineOptions方法之后,submitJobInternal会检查输出文件夹是否已存在,如果存在则抛出异常。之后,就开始划分作业数据,并根据split数得到map tasks的数量。最后,就是把作业配置文件写入submitJobFile,并调用jobSubmitClient.submitJob(jobId)最终提交作业。

  至此,对Hadoop的作业提交分析也差不多了,有些地方讲的比较啰嗦,有些又讲得点到而止,但大体的过程以及一些较重要的东西还是说清楚了,其实就是那么回事。下去的文章我们会在前面的jobUtil基础上增加一些功能来支持Run on Hadoop,其实主要就是增加一个打包Jar的方法。

  To be continued...



楼主热帖
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享 分享淘帖 赞 踩

168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

沙发
发表于 2014-7-14 20:14:35 | 只看该作者
沙发!沙发!
板凳
发表于 2014-7-17 11:51:44 | 只看该作者
不知该说些什么。。。。。。就是谢谢
地板
发表于 2014-7-19 00:27:57 | 只看该作者
有竞争才有进步嘛
5#
发表于 2014-7-21 14:55:46 | 只看该作者
写的真的很不错
6#
发表于 2014-7-25 09:20:44 | 只看该作者
过来看看的
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

GMT+8, 2024-5-3 01:29

Powered by BI168大数据社区

© 2012-2014 168大数据

快速回复 返回顶部 返回列表