最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

[基础] Hadoop作业提交分析系列(五)

[复制链接]
跳转到指定楼层
楼主
发表于 2014-7-12 23:38:26 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
本帖最后由 乔帮主 于 2014-7-12 23:43 编辑

经过上一篇的分析,我们知道了hadoop的作业提交目标是Cluster还是Local,与conf文件夹内的配置文件参数有着密切关系,不仅如此,其它的很多类都跟conf有关,所以提交作业时切记把conf放到你的classpath中。

  因为Configuration是利用当前线程上下文的类加载器来加载资源和文件的,所以这里我们采用动态载入的方式,先添加好对应的依赖库和资源,然后再构建一个URLClassLoader作为当前线程上下文的类加载器。

[url=][/url]
   public static ClassLoader getClassLoader() {
        ClassLoader parent = Thread.currentThread().getContextClassLoader();
        if (parent == null) {
            parent = EJob.class.getClassLoader();
        }
        if (parent == null) {
            parent = ClassLoader.getSystemClassLoader();
        }
        return new URLClassLoader(classPath.toArray(new URL[0]), parent);
    }[url=][/url]

  代码很简单,废话就不多说了。调用例子如下:

   EJob.addClasspath("/usr/lib/hadoop-0.20/conf");
   ClassLoader classLoader = EJob.getClassLoader();
   Thread.currentThread().setContextClassLoader(classLoader);

  设置好了类加载器,下面还有一步就是要打包Jar文件,就是让Project自打包自己的class为一个Jar包,我这里以标准Eclipse工程文件夹布局为例,打包的就是bin文件夹里的class。

[url=][/url]
    public static File createTempJar(String root) throws IOException {
        if (!new File(root).exists()) {
            return null;
        }
        Manifest manifest = new Manifest();
        manifest.getMainAttributes().putValue("Manifest-Version", "1.0");
        final File jarFile = File.createTempFile("EJob-", ".jar", new File(System
                .getProperty("java.io.tmpdir")));

        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                jarFile.delete();
            }
        });

        JarOutputStream out = new JarOutputStream(new FileOutputStream(jarFile),
                manifest);
        createTempJarInner(out, new File(root), "");
        out.flush();
        out.close();
        return jarFile;
    }

    private static void createTempJarInner(JarOutputStream out, File f,
            String base) throws IOException {
        if (f.isDirectory()) {
            File[] fl = f.listFiles();
            if (base.length() > 0) {
                base = base + "/";
            }
            for (int i = 0; i < fl.length; i++) {
                createTempJarInner(out, fl, base + fl.getName());
            }
        } else {
            out.putNextEntry(new JarEntry(base));
            FileInputStream in = new FileInputStream(f);
            byte[] buffer = new byte[1024];
            int n = in.read(buffer);
            while (n != -1) {
                out.write(buffer, 0, n);
                n = in.read(buffer);
            }
            in.close();
        }
    }
[url=][/url]

  这里的对外接口是createTempJar,接收参数为需要打包的文件夹根路径,支持子文件夹打包。使用递归处理法,依次把文件夹里的结构和文件打包到Jar里。很简单,就是基本的文件流操作,陌生一点的就是Manifest和JarOutputStream,查查API就明了。

  好,万事具备,只欠东风了,我们来实践一下试试。还是拿WordCount来举例:

[url=][/url]
        // Add these statements. XXX
        File jarFile = EJob.createTempJar("bin");
        EJob.addClasspath("/usr/lib/hadoop-0.20/conf");
        ClassLoader classLoader =
EJob.getClassLoader();
        Thread.currentThread().setContextClassLoader(classLoader);

        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }

        Job job = new Job(conf, "word count");
        job.setJarByClass(WordCountTest.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
[url=][/url]

  Run as Java Application。。。!!!No job jar file set...异常,看来job.setJarByClass(WordCountTest.class)这个语句设置作业Jar包没有成功。这是为什么呢?

因为这个方法使用了WordCount.class的类加载器来寻找包含该类的Jar包,然后设置该Jar包为作业所用的Jar包。但是我们的作业 Jar包是在程序运行时才打包的,而WordCount.class的类加载器是AppClassLoader,运行后我们无法改变它的搜索路径,所以使用setJarByClass是无法设置作业Jar包的。我们必须使用JobConf里的setJar来直接设置作业Jar包,像下面一样:

((JobConf)job.getConfiguration()).setJar(jarFile);

  好,我们对上面的例子再做下修改,加上上面这条语句。

Job job = new Job(conf, "word count");
// And add this statement. XXX
((JobConf) job.getConfiguration()).setJar(jarFile.toString());

  再Run as Java Application,终于OK了~~

  该种方法的Run on Hadoop使用简单,兼容性好,推荐一试。:)

  本例子由于时间关系,只在Ubuntu上做了伪分布式测试,但理论上是可以用到真实分布式上去的。

     >>点我下载<<






来自群组: Hadoop中国
楼主热帖
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享 分享淘帖 赞 踩

168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

沙发
发表于 2014-7-15 13:09:36 | 只看该作者
看帖回帖是美德!
板凳
发表于 2014-7-18 09:26:34 | 只看该作者
沙发!沙发!
地板
发表于 2014-7-19 12:17:57 | 只看该作者
真是 收益 匪浅
5#
发表于 2014-7-22 22:01:59 | 只看该作者
我是个凑数的。。。
6#
发表于 2014-7-25 20:48:24 | 只看该作者
不错不错,楼主您辛苦了。。。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

GMT+8, 2024-5-3 11:37

Powered by BI168大数据社区

© 2012-2014 168大数据

快速回复 返回顶部 返回列表