Hadoop流式处理:作业链

16

问题是如何在Hadoop中使用Hadoop Streaming(仅)链接作业。


10
文档写得不错,但问题究竟是什么? :) - Lorand Bendig
1
请查看http://incubator.apache.org/oozie/,也许对您有所帮助。 - octo
@octo,感谢提供链接。我的意图是仅使用Hadoop Streaming,因为我首先想坚持使用Hadoop。 - vpap
@Lorand Bendig,问题是如何在Hadoop中使用Hadoop Streaming链接作业。我更喜欢直接提供文档,而不是先问问题再回答。感谢您的“赞美”。 - vpap
1
一个 Bash 脚本肯定能完成工作,但正如 @octo 所提到的,Oozie 是最干净的方式。它支持流处理:https://github.com/yahoo/oozie/blob/master/examples/src/main/apps/streaming/workflow.xml - Lorand Bendig
@Lorand Bending,感谢提供的链接。我更倾向于坚持使用Hadoop,虽然这并不是一个完美的解决方案。我只是建议采用Hadoop解决方案。至于选择Oozie或其他工具,我将留给读者自行决定。 - vpap
1个回答

1
这个答案是提问者实际提出的问题。通常我会引用它,但由于它太长,我会放弃引用。
这是一份关于如何使用Hadoop Streaming(目前版本为1.0.3)链接两个或多个流作业的文档。
为了理解最终进行链接的代码,并能够编写任何其他链式作业,需要一些初步但实用的理论知识。
首先,什么是Hadoop中的作业?Hadoop作业是...
hadoopJob = Configuration + Execution

在哪里,

配置:所有使执行成为可能的设置。

执行:完成所需任务的可执行文件或脚本文件集合。换句话说,就是我们任务的映射和归约步骤。

Configuration = hadoopEnvironment + userEnvironment

其中,

hadoopEnvironment:是Hadoop通用环境的设置。这个通用环境是从资源中定义的,即位于$HADOOP_HOME/conf目录中的xml文件。例如,一些资源是core-site.xml、mapred-site.xml和hadoop-site.xml,它们分别定义了hdfs临时目录、作业跟踪器和集群节点数。

userEnvrironment:是运行作业时用户指定的参数。在Hadoop中,这些参数被称为选项。

userEnvironment = genericOptions + streamingOptions

其中,

genericOptions:它们是通用的,适用于每个流作业,与作业无关。它们由GenericsOptionsParser处理。

streamingOptions:它们是特定于作业的,适用于某个作业。例如,每个作业都有自己的输入和输出目录或文件。它们由StreamJob处理。

图示如下:

                            hadoopJob
                               /\
                              /  \
                             /    \
                            /      \
                           /        \
                Configuration       Execution
                     /\                 |
                    /  \                |
                   /    \   executable or script files
                  /      \
                 /        \
                /          \
  hadoopEnvironment     userEnvironment
           |                   /\
           |                  /  \
           |                 /    \ 
    $HADOOP_HOME/conf       /      \
                           /        \   
                genericOptions   streamingOptions
                      |                 |
                      |                 |
            GenericOptionsParser    StreamJob

作为任何人都可以看到的,以上所有内容都是一系列配置。其中一部分是集群管理员(hadoopEnvironment)的,另一部分是集群用户(userEnvironment)的。总之,工作主要是一个抽象层面上的配置,如果我们暂时忘记执行部分的话。
我们的代码应该处理以上所有内容。现在我们准备好编写代码了。
首先,在代码层面上,什么是Hadoop作业?它是一个jar文件。每当我们提交一个作业时,我们会提交一个带有一些命令行参数的jar文件。例如,当我们运行单个流式作业时,我们执行以下命令。
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar -D mapred.reduce.tasks=1 -mapper m.py -reducer r.py -input /in.txt -output /out/

我们的工作是使用hadoop-streaming-1.0.3.jar处理-D mapred.reduce.tasks=1 -mapper m.py -reducer r.py -input /in.txt -output /out/命令行参数。在这个jar包中,有一个类来负责一切。因此,我们打开一个新的Java文件,比如说TestChain.java。
// import everything needed

public class TestChain
{
    //code here
    public static void main( String[] args) throws Exception
    {
        //code here
    }//end main

}//end TestChain

为了处理hadoop环境,我们的类应该继承类Configured。类Configured使我们能够访问Hadoop的环境和参数,即前面提到的资源。这些资源是包含以名称/值对形式表示的数据的xml文件。
接下来,每个接口或多或少都是外部世界和任务之间的媒介。也就是说,接口是我们用来完成任务的工具。因此,我们的类必须实现Tool接口,该接口声明了一个run()方法。当当然实现了该接口时,此方法定义了我们的工具行为。最后,为了使用我们的工具,我们使用类ToolRunner。通过类GenericOptionsParser,ToolRunner还可以帮助处理用户环境中的通用选项。
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;
// import everything else needed

public class TestChain extends Configured implements Tool
{
    public int run( String[] args) throws Exception
    {
        //code here
        return 0;
    }//end run

    public static void main( String[] args) throws Exception
    {
        // ToolRunner handles generic command line options  
        int res = ToolRunner.run( new Configuration(), new TestChain(), args);
        System.exit( res);
    }//end main

}//end TestChain

为了完整地呈现,方法run()也被称为驱动程序,它设置作业并包括初始化和配置作业。请注意,我们通过方法ToolRunner.run()的第一个参数'new Configuration'委托给ToolRunner处理hadoopEnnvironment。
到目前为止,我们只是设置了工具将运行的环境。现在我们需要定义我们的工具,即执行链接操作。
由于每个链式作业都是流式作业,因此我们将每个作业都创建为流式作业。我们使用类StreamJob的StreamJob.createJob(String[] args)方法来完成这一点。字符串args矩阵包含每个作业的“命令行”参数。这些命令行参数是用户环境的streamingOptions(作业特定)的参考。此外,这些参数采用参数/值对的形式。例如,如果我们的作业有输入文件in.txt,输出目录/out/,映射器m.py和减速器r.py,则...
String[] example = new String[]
{
    "-mapper"   , "m.py"
    "-reducer"  , "r.py"
    "-input"    , "in.txt"
    "-output"   , "/out/"
}

你需要注意两点。首先,"-"是必需的。它是区分参数和值的小东西。在这里,mapper是一个参数,m.py是它的值。通过"-"可以理解它们之间的不同。其次,如果在参数的左引号和"-"之间添加空格,则该参数将被忽略。如果我们有" -mapper",那么" -mapper"不会被视为参数。当StreamJob解析args矩阵时,会查找参数/值对。最后一件事,记住一个任务大致上是一个配置。我们期望StreamJob.createJob()返回一个配置或类似于此的东西。实际上,StreamJob.createJob()返回一个JobConf对象。简而言之,JobConf对象是Hadoop能够理解和执行的特定mapreduce作业的描述。
假设我们有三个要链接的作业,
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;
// import everything else needed

public class TestChain extends Configured implements Tool
{
    public int run( String[] args) throws Exception
    {
        String[] job1 = new String[]
        {
            "-mapper"   , "m1.py"
            "-reducer"  , "r1.py"
            "-input"    , "in1.txt"
            "-output"   , "/out1/"
        }
        JobConf job1Conf = new StreamJob.createJob( job1);
        //code here

        String[] job2 = new String[]
        {
            "-mapper"   , "m2.py"
            "-reducer"  , "r2.py"
            "-input"    , "in2.txt"
            "-output"   , "/out2/"
        }
        JobConf job2Conf = new StreamJob.createJob( job2);
        //code here

        String[] job3 = new String[]
        {
            "-mapper"   , "m3.py"
            "-reducer"  , "r3.py"
            "-input"    , "in3.txt"
            "-output"   , "/out3/"
        }
        JobConf job3Conf = new StreamJob.createJob( job3);
        //code here

        return 0;
    }//end run

    public static void main( String[] args) throws Exception
    {
        // ToolRunner handles generic command line options  
        int res = ToolRunner.run( new Configuration(), new TestChain(), args);
        System.exit( res);
    }//end main

}//end TestChain

在此,我们设置了工具运行的环境并定义了其行为。但是,我们还没有将其投入使用。ToolRunner不够用。ToolRunner会将我们的工具作为一个整体运行,而不是运行单个链式作业。我们需要自己来完成这个任务。
有两种方法可以实现。第一种方法是使用JobClient,第二种方法是使用JobControl
第一种方法 - JobClient
使用JobClient,我们按顺序运行链式作业,通过调用每个作业的JobClient来一个接一个地运行。运行每个单独作业的方法是JobClient.runJob(jobtorun),其中jobtorun是一个JobConf对象。
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;

public class TestChain extends Configured implements Tool
{
    public int run( String[] args) throws Exception
    {
        String[] job1 = new String[]
        {
            "-mapper"   , "m1.py"
            "-reducer"  , "r1.py"
            "-input"    , "in1.txt"
            "-output"   , "/out1/"
        }
        JobConf job1Conf = new StreamJob.createJob( job1);
        JobClient.runJob( job1Conf);

        String[] job2 = new String[]
        {
            "-mapper"   , "m2.py"
            "-reducer"  , "r2.py"
            "-input"    , "in2.txt"
            "-output"   , "/out2/"
        }
        JobConf job2Conf = new StreamJob.createJob( job2);
        JobClient.runJob( job2Conf);

        String[] job3 = new String[]
        {
            "-mapper"   , "m3.py"
            "-reducer"  , "r3.py"
            "-input"    , "in3.txt"
            "-output"   , "/out3/"
        }
        JobConf job3Conf = new StreamJob.createJob( job3);
        JobClient.runJob( job3Conf);

        return 0;
    }//end run

    public static void main( String[] args) throws Exception
    {
        // ToolRunner handles generic command line options  
        int res = ToolRunner.run( new Configuration(), new TestChain(), args);
        System.exit( res);
    }//end main

}//end TestChain

使用JobClient的优点是作业进度会在标准输出中打印出来。
JobClient的缺点是它不能处理作业之间的依赖关系。
第二种方式- JobControl
使用JobControl,所有链式作业都是作业组的一部分。在这里,每个作业都在该组的框架内执行。这意味着必须首先将每个链式作业添加到组中,然后组才能运行。该组是FIFO或组中每个作业的执行遵循FCFS(先进先出)模式。可以通过方法JobControl.addJob(jobtoadd)将每个作业添加到组中。
JobControl可以通过方法x.addDependingJob(y)处理依赖关系,其中作业x依赖于作业y。这意味着作业x在作业y完成之前无法运行。如果作业x从作业y和z两个作业中都依赖,并且z与y无关,则可以通过x.addDependingJob(y)和x.addDependingJob(z)表示这些依赖关系。

与JobClient相反,JobControl与Job对象一起“工作”。例如,当我们调用x.addDependingJob(y)方法时,x和y都是Job对象。对于JobControl.addJob(jobtoadd)也是如此,jobtoadd是一个Job对象。每个Job对象都是从JobConf对象创建的。回到代码,我们有:

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;

public class TestChain extends Configured implements Tool
{
    public int run( String[] args) throws Exception
    {

        //TestChain below is an arbitrary name for the group
        JobControl jobc = new JobControl( "TestChain");

        String[] job1 = new String[]
        {
            "-mapper"   , "m1.py"
            "-reducer"  , "r1.py"
            "-input"    , "in1.txt"
            "-output"   , "/out1/"
        }
        JobConf job1Conf = new StreamJob.createJob( job1);
        Job job1 = new Job( job1conf);
        jobc.addJob( job1);

        String[] job2 = new String[]
        {
            "-mapper"   , "m2.py"
            "-reducer"  , "r2.py"
            "-input"    , "in2.txt"
            "-output"   , "/out2/"
        }
        JobConf job2Conf = new StreamJob.createJob( job2);
        Job job2 = new Job( job2conf);
        jobc.addJob( job2);

        String[] job3 = new String[]
        {
            "-mapper"   , "m3.py"
            "-reducer"  , "r3.py"
            "-input"    , "/out2/par*"
            "-output"   , "/out3/"
        }
        JobConf job3Conf = new StreamJob.createJob( job3);
        Job job3 = new Job( job3conf);
        job3.addDependingJob( job2);
        jobc.addJob( job3);

        //code here

        return 0;
    }//end run

    public static void main( String[] args) throws Exception
    {
        // ToolRunner handles generic command line options  
        int res = ToolRunner.run( new Configuration(), new TestChain(), args);
        System.exit( res);
    }//end main

}//end TestChain

在上面的代码中,请注意job3依赖于job2。正如您所看到的,job3的输入是job2的输出。这个事实是一个依赖关系。job3等待直到job2完成。
到目前为止,我们只是将链式作业添加到组中并描述了它们的依赖关系。我们需要最后一件事来运行这组作业。
蛮力方法是调用JobControl.run()方法。虽然这种方法可行,但存在问题。当链式作业完成时,整个作业仍将永远运行。一个正确的方法是从我们已经存在的作业线程中定义一个新的执行线程(当作业运行时)。然后我们可以等待链式作业完成,然后退出。在链式作业执行期间,我们可以询问作业执行信息,例如有多少作业已经完成,或者我们可以找出作业是否处于无效状态以及是哪个作业。
使用JobControl的这种方式的优点是可以处理可能存在的许多作业之间的依赖关系。
JobControl的一个缺点是作业进度不会在标准输出中打印出来,这使得它不太直观。无论作业成功还是失败,都没有任何有用的输出。您必须通过Hadoop的Web UI进行检查或在下面的while循环中添加一些代码来跟踪作业状态或所需的任何其他信息。最后,保留HTML标签。
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;

public class TestChain extends Configured implements Tool
{
    public int run( String[] args) throws Exception
    {

        //TestChain below is an arbitrary name for the group
        JobControl jobc = new JobControl( "TestChain");

        String[] job1 = new String[]
        {
            "-mapper"   , "m1.py"
            "-reducer"  , "r1.py"
            "-input"    , "in1.txt"
            "-output"   , "/out1/"
        }
        JobConf job1Conf = new StreamJob.createJob( job1);
        Job job1 = new Job( job1conf);
        jobc.addJob( job1);

        String[] job2 = new String[]
        {
            "-mapper"   , "m2.py"
            "-reducer"  , "r2.py"
            "-input"    , "in2.txt"
            "-output"   , "/out2/"
        }
        JobConf job2Conf = new StreamJob.createJob( job2);
        Job job2 = new Job( job2conf);
        jobc.addJob( job2);

        String[] job3 = new String[]
        {
            "-mapper"   , "m3.py"
            "-reducer"  , "r3.py"
            "-input"    , "/out2/par*"
            "-output"   , "/out3/"
        }
        JobConf job3Conf = new StreamJob.createJob( job3);
        Job job3 = new Job( job3conf);
        job3.addDependingJob( job2);
        jobc.addJob( job3);

        Thread runjobc = new Thread( jobc);
        runjobc.start();

        while( !jobc.allFinished())
        {
            //do whatever you want; just wait or ask for job information
        }

        return 0;
    }//end run

    public static void main( String[] args) throws Exception
    {
        // ToolRunner handles generic command line options  
        int res = ToolRunner.run( new Configuration(), new TestChain(), args);
        System.exit( res);
    }//end main

}//end TestChain

错误

本部分讨论可能发生的一些错误。在下面的错误消息中,有一个名为OptimizingJoins的类。这个类只是为了演示各种错误而存在,与本次讨论无关。

编译时出现包不存在的错误。

这是classpath的问题。例如,编译时需要添加hadoop-streaming-1.0.3.jar包。

javac -classpath /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar:/usr/local/hadoop/hadoop-core-1.0.3.jar TestChain.java

请添加任何缺失的包。

java.lang.NoClassDefFoundError: org/apache/hadoop/streaming/StreamJob

完整错误信息如下:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/streaming/StreamJob
at OptimizingJoins.run(OptimizingJoins.java:135)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at OptimizingJoins.main(OptimizingJoins.java:248)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.streaming.StreamJob
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
... 8 more

这涉及到我们jar文件的清单文件。如果我们按照上述方式编译作业,一切都很好。Java编译器会找到它需要的所有内容。但是,当我们通过命令在Hadoop中运行作业时,情况就不同了。
$HADOOP_HOME/bin/hadoop jar /home/hduser/TestChain.jar TestChain

如果我们运行的jar文件所在的JVM找不到StreamJob,为了解决这个问题,当我们创建jar文件时,需要在其中加入一个包含StreamJob类路径的清单文件。实际上,

MANIFEST.MF

Manifest-Version: 1.0
Class-Path: /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar
Created-By: 1.7.0_07 (Oracle Corporation)

请注意,MANIFEST.MF文件必须以空行结尾。我们的MANIFEST.MF文件有4行,而不是3行。然后我们创建jar文件,如下所示,
jar cmf META-INF/MANIFEST.MF TestChain.jar TestChain.class 

错误 streaming.StreamJob: 无法识别选项: -D

这个错误是由于StreamJob无法解析-D选项造成的。StreamJob只能解析流处理和作业特定的选项,而-D是一个通用选项。

有两个解决方案。第一个解决方案是使用-jobconf选项代替-D选项。第二个解决方案是通过GenericOptionsParser对象解析-D选项。在第二个解决方案中,你需要从StreamJob.createJob() args中删除-D选项。

举个例子,第二个解决方案的"干净"代码实现如下:

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.streaming.StreamJob;

public class TestChain
{
    public class Job1 extends Configured implements Tool
    {
        public int run( String[] args) throws Exception
        {
            String[] job1 = new String[]
            {
                "-mapper"   , "m1.py"
                "-reducer"  , "r1.py"
                "-input"    , "in1.txt"
                "-output"   , "/out1/"
            }
            JobConf job1Conf = new StreamJob.createJob( job1);
            JobClient.runJob( job1Conf);

            return 0;
        }//end run
    }

    public class Job2 extends Configured implements Tool
    {
        public int run( String[] args) throws Exception
        {
            String[] job2 = new String[]
            {
                "-mapper"   , "m2.py"
                "-reducer"  , "r2.py"
                "-input"    , "in2.txt"
                "-output"   , "/out2/"
            }
            JobConf job2Conf = new StreamJob.createJob( job2);
            JobClient.runJob( job2Conf);

            return 0;
        }//end run
    }

    public class Job3 extends Configured implements Tool
    {
        public int run( String[] args) throws Exception
        {
            String[] job3 = new String[]
            {
                "-mapper"   , "m3.py"
                "-reducer"  , "r3.py"
                "-input"    , "in3.txt"
                "-output"   , "/out3/"
            }
            JobConf job3Conf = new StreamJob.createJob( job3);
            JobClient.runJob( job3Conf);

            return 0;
        }//end run
    }


    public static void main( String[] args) throws Exception
    {
        TestChain tc = new TestChain();

        //Domination
        String[] j1args = new String[]
        {
            "-D", "mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator",
            "-D", "mapred.text.key.comparator.options=-k1,1"    ,
            "-D", "mapred.reduce.tasks=1"
        };

        // Let ToolRunner handle generic command-line options   
        int j1res = ToolRunner.run( new Configuration(), tc.new Job1(), j1args);

        //Cost evaluation
        String[] j2rgs = new String[]
        {
            "-D", "mapred.reduce.tasks=12 "                 ,
            "-D", "mapred.text.key,partitioner.options=-k1,1"
        };

        // Let ToolRunner handle generic command-line options   
        int j2res = ToolRunner.run( new Configuration(), tc.new Job2(), j2args);

        //Minimum Cost
        String[] j3args = new String[]
        {
            "-D", "mapred.reduce.tasks=1"
        };

        // Let ToolRunner handle generic command-line options   
        int j3res = ToolRunner.run( new Configuration(), tc.new Job1(), j3args);
        System.exit( mres);
    }
}//end TestChain    

在上面的代码中,我们定义了一个全局类TestChain,它封装了链式作业。然后,我们定义了每个单独的链式作业,即我们定义了它的运行方法。每个链式作业都是一个继承Configured并实现Tool的类。最后,在TestChain的主方法中,我们按顺序运行每个作业。请注意,在运行任何链式作业之前,我们定义了其通用选项。
编译
javac -classpath /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar:/usr/local/hadoop/hadoop-core-1.0.3.jar TestChain.java 

罐子
jar cmf META-INF/MANIFEST.MF TestChain.jar TestChain.class TestChain\$Dom.class TestChain\$Cost.class TestChain\$Min.class 

错误 security.UserGroupInformation: PriviledgedActionException as:hduser cause:org.apache.hadoop.mapred.InvalidInputException: 输入模式hdfs://localhost:54310/user/hduser/whateverFile匹配0个文件

当我们使用JobControl时,会出现此错误。例如,如果作业的输入是先前作业的输出,则如果此输入-输出文件不存在,则会发生此错误。 JobControl 并行运行所有独立的作业,而不像JobClient一样一个接一个运行。因此,Jobcontrol尝试运行其输入文件不存在的作业,因此失败。

为避免这种情况,我们声明两个作业之间存在依赖关系,使用x.addDependingJob(y) ,其中作业x依赖于作业y。现在,JobControl 不会尝试并行运行依赖作业。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接