package com.cere;
import org.apache.hadoop.conf.Configuration;
public class ReadConf{
public static void main(String[] args){
Configuration conf = new Configuration();
conf.addResource("./new-conf/configuration-1.xml");
System.out.printf("color = %s\n", conf.get("color"));
System.out.printf("size = %d\n", conf.getInt("size", 0));
System.out.printf("weight = %s\n", conf.get("weight"));
System.out.printf("wide = %s\n", conf.get("breadth", "wide"));
}
}
2.9.2 ReadMulConf.java的内容如下:
package com.cere;
import org.apache.hadoop.conf.Configuration;
public class ReadMulConf{
public static void main(String[] args){
Configuration conf = new Configuration();
conf.addResource("./new-conf/configuration-1.xml");
conf.addResource("./new-conf/configuration-2.xml");
System.out.printf("color = %s\n", conf.get("color"));
System.out.printf("size = %d\n", conf.getInt("size", 0));
System.out.printf("weight = %s\n", conf.get("weight"));
System.out.printf("wide = %s\n", conf.get("breadth", "wide"));
}
}
2.9.3 编译:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-5/p2$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes/ src/*.java
2.9.4 打包:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-5/p2$ jar -cvf rmc.jar -C ./classes/ .
2.9.5 执行:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-5/p2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar rmc.jar com.cere.ReadMulConf
输出结果:
color = yellow
size = 12
weight = heavy
wide = wide
2.9.6 结果分析:size属性值原先是10,在configuration-2.xml中被改成12;weight在configuration-1.xml是heavy,尽管configuration-2.xml的weight属性是light,但在configuration-1.xml中weight的属性是final,不可以被更改,于是仍然是heavy。
2.10 例子3 可变的扩展
2.10.1 源代码是ReadMulConfD.java
package com.cere;
import org.apache.hadoop.conf.Configuration;
public class ReadMulConfD{
public static void main(String[] args){
Configuration conf = new Configuration();
conf.addResource("./new-conf/configuration-1.xml");
conf.addResource("./new-conf/configuration-2.xml");
System.out.printf("color = %s\n", conf.get("color"));
System.out.printf("size = %d\n", conf.getInt("size", 0));
System.out.printf("weight = %s\n", conf.get("weight"));
System.out.printf("wide = %s\n", conf.get("breadth", "wide"));
System.out.printf("size-weight = %s\n", conf.get("size-weight"));
System.out.printf("------------------------\nreset size\n");
System.setProperty("size", "14");
System.out.printf("size-weight = %s\n", conf.get("size-weight"));
}
}<span style="font-family: 宋体, 'sans serif', tahoma, verdana, helvetica;"> </span>
2.10.2 编译:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-5/p3$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes/ src/*.java
2.10.3 打包:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-5/p3$ jar -cvf rmcd.jar -C ./classes/ .
2.10.4 执行:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-5/p3$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar rmcd.jar com.cere.ReadMulConfD
输出结果:
color = yellow
size = 12
weight = heavy
wide = wide
size-weight = 12,heavy
------------------------
reset size
size-weight = 14,heavy
2.10.5 解释:在读取了配置之后,再执行系统属性的配置将size设置成14,于是再次读取size-weight的时候,size就变成14。
3. 配置管理
3.1 开发Hadoop时候,需要在本地运行和集群之间进行切换。可以在几个集群上工作,或者在本地伪分布式集群上测试。伪分布式集群是其守护进程运行在本地的集群。
3.2 hadoop-1.1.2的子目录conf放上3个配置文件,hadoop-local.xml,hadoop-localhost.xml和hadoop-cluster.xml。hadoop在三种配置之间切换运行。
3.3 在客户端执行不同的配置:
"hadoop fs -conf conf/hadoop-localhost.xml -ls ."
这样列出来的就是本地集群的文件。
3.4 hadoop-local.xml文件:
<?xml version="1.0"?>
<configuration>
<property>
<name>fs.default.name</name>
<value>file:///</value>
</property>
<property>
<name>mapred.job.tracker</name>
<value>local</value>
</property>
</configuration>
3.5 hadoop-localhost.xml文件:
<?xml version="1.0"?>
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
</property>
<property>
<name>mapred.job.tracker</name>
<value>localhost:9001</value>
</property>
3.6 hadoop-cluster.xml文件:
<?xml version="1.0"?>
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://namenode/</value>
</property>
<property>
<name>mapred.job.tracker</name>
<value>jobtracker:8021</value>
</property>
</configuration>
3.7 执行1:brian@brian-laptop:~/usr/hadoop/hadoop-1.1.2$ ./bin/hadoop fs -conf conf/hadoop-localhost.xml -ls .
这个会显示hdfs上的当前用户的目录下的文件。
3.8 执行2:brian@brian-laptop:~/usr/hadoop/hadoop-1.1.2$ ./bin/hadoop fs -conf conf/hadoop-local.xml -ls .
这个会显示/home/brian/usr/hadoop/hadoop-1.1.2目录下的文件。
4. 辅助类 GenericOptionsParser, Tool和ToolRunner
4.1 GenericOptionsParser是一个类,用来解释常用的Hadoop命令行选项,并根据需要,为Configuration对象设置相应的取值。
4.2 通常不直接使用GenericOptionsParser,更方便的方式是实现Tool接口,通过ToolRunner来运行应用程序,ToolRunner内部调用GenericOptionsParser。
4.3 Configurable是一个接口。Configured类是Configurable的一个实现。ConfigurationPrinter是Configured的一个子类。
Tool是一个接口,它继承了Cofigurable接口。
4.3 Tool实现示例,用于打印一个Configuration对象的属性
4.3.1 代码ConfigurationPrinter.java
package com.cere;
import java.util.Map;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ConfigurationPrinter extends Configured implements Tool{
static {
Configuration.addDefaultResource("./conf/hdfs-default.xml");
Configuration.addDefaultResource("./conf/hdfs-site.xml");
Configuration.addDefaultResource("./conf/mapred-default.xml");
Configuration.addDefaultResource("./conf/mapred-site.xml");
}
@Override
public int run(String[] args) throws Exception{
Configuration conf = getConf();
for(Map.Entry<String, String> entry: conf){
System.out.printf("%s = %s\n", entry.getKey(), entry.getValue());
}
return 0;
}
public static void main(String[] args) throws Exception{
int exitCode = ToolRunner.run(new ConfigurationPrinter(), args);
System.exit(exitCode);
}
}
4.3.2 编译:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-5/p4$javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes/ src/*.java
4.3.3 打包:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-5/p4$ jar -cvf cp.jar -C ./classes/ .
package com.cere;
import java.util.Random;
public class RandomUtil{
private final static Random DEFAULT_RANDOM = new Random();
public static int randomSelect(int[] weights){
return randomSelect(DEFAULT_RANDOM, weights);
}
public static int randomSelect(Random r, int[] weights){
if (weights == null || weights.length == 0){
throw new IllegalArgumentException("weights must not be an empty arrary");
}
int s = 0;
for(int i:weights){
s += i;
}
int target = r.nextInt(s);
System.out.printf("target = %d\n", target);
int accumulator = 0;
for(int i = 0; i < weights.length; i++){
System.out.printf("\ni = %d\n", i);
accumulator += weights;
System.out.printf("accumulator = %d\n", accumulator);
if (accumulator > target){
System.out.printf("return i = %d\n", i);
return i;
}
}
System.out.printf("return 0\n");
return 0;
}
public static void main(String[] args){
int[] expected = new int[] {0, 0, 1, 2, 4, 4};
randomSelect(expected);
}
}
package com.cere;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Random;
import org.junit.Assert;
import org.junit.Test;
public class TestRandomUtil{
@Test
public void testSelect(){
int[] weights = {2,1,3,0,4};
int s = 0;
for(int i:weights){s += i;}
int[] expected = new int[] {0, 0, 1, 2, 2, 2, 4, 4, 4, 4};
for(int i = 0; i < s; i++){
Random r = mock(Random.class);
when(r.nextInt(s)).thenReturn(i);
Assert.assertEquals(expected, RandomUtil.randomSelect(r, weights));
}
}
}
package com.ifis;
import java.io.IOException;
import static org.mockito.Mockito.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.OutputCollector;
import org.junit.*;
public class TestMaxTemperatureMapper{
@Test
public void processValidRecord() throws IOException, InterruptedException{
MaxTemperatureMapper mapper = new MaxTemperatureMapper();
Text value = new Text("0043011990999991950051518004+68750+023550FM-12+0382" +
"99999V0203201N00261220001CN9999999N9-00111+99999999999");
OutputCollector<Text, IntWritable> output = mock(OutputCollector.class);
mapper.map(null, value, output, null);
verify(output).collect(new Text("1950"), new IntWritable(-11));
}
}
package com.ifis;
import java.io.IOException;
import java.util.Iterator;
import java.util.Arrays;
import static org.mockito.Mockito.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.OutputCollector;
import org.junit.*;
public class TestMaxTemperatureReducer{
@Test
public void processValidRecord() throws IOException, InterruptedException{
MaxTemperatureReducer reducer = new MaxTemperatureReducer();
Text key = new Text("1950");
Iterator<IntWritable> values = Arrays.asList(new IntWritable(10), new IntWritable(5)).iterator();
OutputCollector<Text, IntWritable> output = mock(OutputCollector.class);
reducer.reduce(key, values, output, null);
verify(output).collect(key, new IntWritable(10));
}
}
欢迎光临 168大数据 (http://www.bi168.cn/) | Powered by Discuz! X3.2 |