问题是如何在Hadoop中使用Hadoop Streaming(仅)链接作业。
问题是如何在Hadoop中使用Hadoop Streaming(仅)链接作业。
hadoopJob = Configuration + Execution
在哪里,
配置:所有使执行成为可能的设置。
执行:完成所需任务的可执行文件或脚本文件集合。换句话说,就是我们任务的映射和归约步骤。
Configuration = hadoopEnvironment + userEnvironment
其中,
hadoopEnvironment:是Hadoop通用环境的设置。这个通用环境是从资源中定义的,即位于$HADOOP_HOME/conf目录中的xml文件。例如,一些资源是core-site.xml、mapred-site.xml和hadoop-site.xml,它们分别定义了hdfs临时目录、作业跟踪器和集群节点数。
userEnvrironment:是运行作业时用户指定的参数。在Hadoop中,这些参数被称为选项。
userEnvironment = genericOptions + streamingOptions
其中,
genericOptions:它们是通用的,适用于每个流作业,与作业无关。它们由GenericsOptionsParser处理。
streamingOptions:它们是特定于作业的,适用于某个作业。例如,每个作业都有自己的输入和输出目录或文件。它们由StreamJob处理。
图示如下:
hadoopJob
/\
/ \
/ \
/ \
/ \
Configuration Execution
/\ |
/ \ |
/ \ executable or script files
/ \
/ \
/ \
hadoopEnvironment userEnvironment
| /\
| / \
| / \
$HADOOP_HOME/conf / \
/ \
genericOptions streamingOptions
| |
| |
GenericOptionsParser StreamJob
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar -D mapred.reduce.tasks=1 -mapper m.py -reducer r.py -input /in.txt -output /out/
// import everything needed
public class TestChain
{
//code here
public static void main( String[] args) throws Exception
{
//code here
}//end main
}//end TestChain
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;
// import everything else needed
public class TestChain extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
//code here
return 0;
}//end run
public static void main( String[] args) throws Exception
{
// ToolRunner handles generic command line options
int res = ToolRunner.run( new Configuration(), new TestChain(), args);
System.exit( res);
}//end main
}//end TestChain
String[] example = new String[]
{
"-mapper" , "m.py"
"-reducer" , "r.py"
"-input" , "in.txt"
"-output" , "/out/"
}
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;
// import everything else needed
public class TestChain extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
String[] job1 = new String[]
{
"-mapper" , "m1.py"
"-reducer" , "r1.py"
"-input" , "in1.txt"
"-output" , "/out1/"
}
JobConf job1Conf = new StreamJob.createJob( job1);
//code here
String[] job2 = new String[]
{
"-mapper" , "m2.py"
"-reducer" , "r2.py"
"-input" , "in2.txt"
"-output" , "/out2/"
}
JobConf job2Conf = new StreamJob.createJob( job2);
//code here
String[] job3 = new String[]
{
"-mapper" , "m3.py"
"-reducer" , "r3.py"
"-input" , "in3.txt"
"-output" , "/out3/"
}
JobConf job3Conf = new StreamJob.createJob( job3);
//code here
return 0;
}//end run
public static void main( String[] args) throws Exception
{
// ToolRunner handles generic command line options
int res = ToolRunner.run( new Configuration(), new TestChain(), args);
System.exit( res);
}//end main
}//end TestChain
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;
public class TestChain extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
String[] job1 = new String[]
{
"-mapper" , "m1.py"
"-reducer" , "r1.py"
"-input" , "in1.txt"
"-output" , "/out1/"
}
JobConf job1Conf = new StreamJob.createJob( job1);
JobClient.runJob( job1Conf);
String[] job2 = new String[]
{
"-mapper" , "m2.py"
"-reducer" , "r2.py"
"-input" , "in2.txt"
"-output" , "/out2/"
}
JobConf job2Conf = new StreamJob.createJob( job2);
JobClient.runJob( job2Conf);
String[] job3 = new String[]
{
"-mapper" , "m3.py"
"-reducer" , "r3.py"
"-input" , "in3.txt"
"-output" , "/out3/"
}
JobConf job3Conf = new StreamJob.createJob( job3);
JobClient.runJob( job3Conf);
return 0;
}//end run
public static void main( String[] args) throws Exception
{
// ToolRunner handles generic command line options
int res = ToolRunner.run( new Configuration(), new TestChain(), args);
System.exit( res);
}//end main
}//end TestChain
与JobClient相反,JobControl与Job对象一起“工作”。例如,当我们调用x.addDependingJob(y)方法时,x和y都是Job对象。对于JobControl.addJob(jobtoadd)也是如此,jobtoadd是一个Job对象。每个Job对象都是从JobConf对象创建的。回到代码,我们有:
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;
public class TestChain extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
//TestChain below is an arbitrary name for the group
JobControl jobc = new JobControl( "TestChain");
String[] job1 = new String[]
{
"-mapper" , "m1.py"
"-reducer" , "r1.py"
"-input" , "in1.txt"
"-output" , "/out1/"
}
JobConf job1Conf = new StreamJob.createJob( job1);
Job job1 = new Job( job1conf);
jobc.addJob( job1);
String[] job2 = new String[]
{
"-mapper" , "m2.py"
"-reducer" , "r2.py"
"-input" , "in2.txt"
"-output" , "/out2/"
}
JobConf job2Conf = new StreamJob.createJob( job2);
Job job2 = new Job( job2conf);
jobc.addJob( job2);
String[] job3 = new String[]
{
"-mapper" , "m3.py"
"-reducer" , "r3.py"
"-input" , "/out2/par*"
"-output" , "/out3/"
}
JobConf job3Conf = new StreamJob.createJob( job3);
Job job3 = new Job( job3conf);
job3.addDependingJob( job2);
jobc.addJob( job3);
//code here
return 0;
}//end run
public static void main( String[] args) throws Exception
{
// ToolRunner handles generic command line options
int res = ToolRunner.run( new Configuration(), new TestChain(), args);
System.exit( res);
}//end main
}//end TestChain
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;
public class TestChain extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
//TestChain below is an arbitrary name for the group
JobControl jobc = new JobControl( "TestChain");
String[] job1 = new String[]
{
"-mapper" , "m1.py"
"-reducer" , "r1.py"
"-input" , "in1.txt"
"-output" , "/out1/"
}
JobConf job1Conf = new StreamJob.createJob( job1);
Job job1 = new Job( job1conf);
jobc.addJob( job1);
String[] job2 = new String[]
{
"-mapper" , "m2.py"
"-reducer" , "r2.py"
"-input" , "in2.txt"
"-output" , "/out2/"
}
JobConf job2Conf = new StreamJob.createJob( job2);
Job job2 = new Job( job2conf);
jobc.addJob( job2);
String[] job3 = new String[]
{
"-mapper" , "m3.py"
"-reducer" , "r3.py"
"-input" , "/out2/par*"
"-output" , "/out3/"
}
JobConf job3Conf = new StreamJob.createJob( job3);
Job job3 = new Job( job3conf);
job3.addDependingJob( job2);
jobc.addJob( job3);
Thread runjobc = new Thread( jobc);
runjobc.start();
while( !jobc.allFinished())
{
//do whatever you want; just wait or ask for job information
}
return 0;
}//end run
public static void main( String[] args) throws Exception
{
// ToolRunner handles generic command line options
int res = ToolRunner.run( new Configuration(), new TestChain(), args);
System.exit( res);
}//end main
}//end TestChain
本部分讨论可能发生的一些错误。在下面的错误消息中,有一个名为OptimizingJoins的类。这个类只是为了演示各种错误而存在,与本次讨论无关。
编译时出现包不存在的错误。
这是classpath的问题。例如,编译时需要添加hadoop-streaming-1.0.3.jar包。
javac -classpath /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar:/usr/local/hadoop/hadoop-core-1.0.3.jar TestChain.java
请添加任何缺失的包。
java.lang.NoClassDefFoundError: org/apache/hadoop/streaming/StreamJob
完整错误信息如下:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/streaming/StreamJob
at OptimizingJoins.run(OptimizingJoins.java:135)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at OptimizingJoins.main(OptimizingJoins.java:248)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.streaming.StreamJob
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
... 8 more
$HADOOP_HOME/bin/hadoop jar /home/hduser/TestChain.jar TestChain
如果我们运行的jar文件所在的JVM找不到StreamJob,为了解决这个问题,当我们创建jar文件时,需要在其中加入一个包含StreamJob类路径的清单文件。实际上,
MANIFEST.MF
Manifest-Version: 1.0
Class-Path: /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar
Created-By: 1.7.0_07 (Oracle Corporation)
jar cmf META-INF/MANIFEST.MF TestChain.jar TestChain.class
错误 streaming.StreamJob: 无法识别选项: -D
这个错误是由于StreamJob无法解析-D选项造成的。StreamJob只能解析流处理和作业特定的选项,而-D是一个通用选项。
有两个解决方案。第一个解决方案是使用-jobconf选项代替-D选项。第二个解决方案是通过GenericOptionsParser对象解析-D选项。在第二个解决方案中,你需要从StreamJob.createJob() args中删除-D选项。
举个例子,第二个解决方案的"干净"代码实现如下:
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;
public class TestChain
{
public class Job1 extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
String[] job1 = new String[]
{
"-mapper" , "m1.py"
"-reducer" , "r1.py"
"-input" , "in1.txt"
"-output" , "/out1/"
}
JobConf job1Conf = new StreamJob.createJob( job1);
JobClient.runJob( job1Conf);
return 0;
}//end run
}
public class Job2 extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
String[] job2 = new String[]
{
"-mapper" , "m2.py"
"-reducer" , "r2.py"
"-input" , "in2.txt"
"-output" , "/out2/"
}
JobConf job2Conf = new StreamJob.createJob( job2);
JobClient.runJob( job2Conf);
return 0;
}//end run
}
public class Job3 extends Configured implements Tool
{
public int run( String[] args) throws Exception
{
String[] job3 = new String[]
{
"-mapper" , "m3.py"
"-reducer" , "r3.py"
"-input" , "in3.txt"
"-output" , "/out3/"
}
JobConf job3Conf = new StreamJob.createJob( job3);
JobClient.runJob( job3Conf);
return 0;
}//end run
}
public static void main( String[] args) throws Exception
{
TestChain tc = new TestChain();
//Domination
String[] j1args = new String[]
{
"-D", "mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator",
"-D", "mapred.text.key.comparator.options=-k1,1" ,
"-D", "mapred.reduce.tasks=1"
};
// Let ToolRunner handle generic command-line options
int j1res = ToolRunner.run( new Configuration(), tc.new Job1(), j1args);
//Cost evaluation
String[] j2rgs = new String[]
{
"-D", "mapred.reduce.tasks=12 " ,
"-D", "mapred.text.key,partitioner.options=-k1,1"
};
// Let ToolRunner handle generic command-line options
int j2res = ToolRunner.run( new Configuration(), tc.new Job2(), j2args);
//Minimum Cost
String[] j3args = new String[]
{
"-D", "mapred.reduce.tasks=1"
};
// Let ToolRunner handle generic command-line options
int j3res = ToolRunner.run( new Configuration(), tc.new Job1(), j3args);
System.exit( mres);
}
}//end TestChain
javac -classpath /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar:/usr/local/hadoop/hadoop-core-1.0.3.jar TestChain.java
jar cmf META-INF/MANIFEST.MF TestChain.jar TestChain.class TestChain\$Dom.class TestChain\$Cost.class TestChain\$Min.class
错误 security.UserGroupInformation: PriviledgedActionException as:hduser cause:org.apache.hadoop.mapred.InvalidInputException: 输入模式hdfs://localhost:54310/user/hduser/whateverFile匹配0个文件
当我们使用JobControl时,会出现此错误。例如,如果作业的输入是先前作业的输出,则如果此输入-输出文件不存在,则会发生此错误。 JobControl 并行运行所有独立的作业,而不像JobClient一样一个接一个运行。因此,Jobcontrol尝试运行其输入文件不存在的作业,因此失败。
为避免这种情况,我们声明两个作业之间存在依赖关系,使用x.addDependingJob(y) ,其中作业x依赖于作业y。现在,JobControl 不会尝试并行运行依赖作业。