在Hadoop中链接多个MapReduce作业

127
在许多应用MapReduce的实际情况下,最终算法会涉及到多个MapReduce步骤。即 Map1、Reduce1、Map2、Reduce2 等等。因此,你需要将最后一个 reduce 的输出作为下一个 map 的输入。一般来说,成功完成流水线后,中间数据是不需要保留的。另外,由于这些中间数据通常是某种数据结构(如“map”或“set”),因此你不希望在编写和读取键值对时花费过多的精力。
那么,在 Hadoop 中有什么推荐的方法呢?是否有一个(简单)的例子展示了如何正确地处理这些中间数据,包括清理工作?

2
使用哪个MapReduce框架? - skaffman
1
我编辑了问题以澄清我在谈论Hadoop。 - Niels Basjes
我建议使用swineherd gem来完成这个任务:https://github.com/Ganglion/swineherd 最好的祝福,Tobias - Tobias
14个回答

59

我认为Yahoo的开发者网络上的这篇教程可以帮助你完成此操作:Chaining Jobs

您可以使用JobClient.runJob()方法。第一个作业的输出路径将成为第二个作业的输入路径。需要将其作为参数传递给您的作业,并编写适当的代码来解析它们并设置作业的参数。

我认为上面的方法可能是旧的mapred API的方法,但仍然可行。新的mapreduce API中也会有类似的方法,但我不确定它是什么。

至于在作业完成后删除中间数据,您可以在代码中实现。我以前使用的方法是像这样:

FileSystem.delete(Path f, boolean recursive);

路径指的是数据在HDFS上的位置。你需要确保只有在没有其他作业需要此数据时才删除它。


3
感谢提供雅虎教程的链接。如果两个程序需要在同一运行中,Chaining Jobs确实是您想要的。我想知道的是,如果您想要能够单独运行它们,最简便的方法是什么。在提到的教程中,我找到了SequenceFileOutputFormat“编写适合读取到后续MapReduce作业的二进制文件”和相应的SequenceFileInputFormat,这使得所有操作都非常容易完成。谢谢。 - Niels Basjes

22

有很多方法可以实现。

(1) 级联作业

为第一个作业创建JobConf对象"job1",并将所有参数设置为“input”为输入目录,“temp”为输出目录。执行此作业:

JobClient.run(job1).

在其正下方,创建名为“job2”的JobConf对象,将所有参数设置为“temp”作为输入目录,“output”作为输出目录。执行此作业:

JobClient.run(job2).

(2) 创建两个 JobConf 对象,并像(1)一样设置其中的所有参数,但不使用 JobClient.run。

然后使用 jobconfs 参数创建两个 Job 对象:

Job job1=new Job(jobconf1); 
Job job2=new Job(jobconf2);

通过使用jobControl对象,您可以指定作业之间的依赖关系,并运行这些作业:

JobControl jbcntrl=new JobControl("jbcntrl");
jbcntrl.addJob(job1);
jbcntrl.addJob(job2);
job2.addDependingJob(job1);
jbcntrl.run();

(3) 如果你需要类似于Map+ | Reduce | Map*这样的结构,你可以使用Hadoop 0.19版本及更高版本中提供的ChainMapper和ChainReducer类。


8

我使用JobConf对象依次完成了作业链。我使用了WordCount示例来连接这些作业。第一个作业计算给定输出中每个单词出现的次数。第二个作业将第一个作业的输出作为输入,计算出给定输入中的总单词数。以下是需要放置在Driver类中的代码。

    //First Job - Counts, how many times a word encountered in a given file 
    JobConf job1 = new JobConf(WordCount.class);
    job1.setJobName("WordCount");

    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(IntWritable.class);

    job1.setMapperClass(WordCountMapper.class);
    job1.setCombinerClass(WordCountReducer.class);
    job1.setReducerClass(WordCountReducer.class);

    job1.setInputFormat(TextInputFormat.class);
    job1.setOutputFormat(TextOutputFormat.class);

    //Ensure that a folder with the "input_data" exists on HDFS and contains the input files
    FileInputFormat.setInputPaths(job1, new Path("input_data"));

    //"first_job_output" contains data that how many times a word occurred in the given file
    //This will be the input to the second job. For second job, input data name should be
    //"first_job_output". 
    FileOutputFormat.setOutputPath(job1, new Path("first_job_output"));

    JobClient.runJob(job1);


    //Second Job - Counts total number of words in a given file

    JobConf job2 = new JobConf(TotalWords.class);
    job2.setJobName("TotalWords");

    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(IntWritable.class);

    job2.setMapperClass(TotalWordsMapper.class);
    job2.setCombinerClass(TotalWordsReducer.class);
    job2.setReducerClass(TotalWordsReducer.class);

    job2.setInputFormat(TextInputFormat.class);
    job2.setOutputFormat(TextOutputFormat.class);

    //Path name for this job should match first job's output path name
    FileInputFormat.setInputPaths(job2, new Path("first_job_output"));

    //This will contain the final output. If you want to send this jobs output
    //as input to third job, then third jobs input path name should be "second_job_output"
    //In this way, jobs can be chained, sending output one to other as input and get the
    //final output
    FileOutputFormat.setOutputPath(job2, new Path("second_job_output"));

    JobClient.runJob(job2);

运行这些作业的命令是:

bin/hadoop jar TotalWords。

我们需要为该命令指定最终作业的名称。在上面的示例中,名称是TotalWords。


7
实际上有几种方法可以实现这个目标,我将重点介绍其中两种。
一种是通过Riffle(http://github.com/cwensel/riffle),这是一个注解库,用于识别依赖关系,并按照依赖关系(拓扑顺序)'执行'它们。
或者您可以使用Cascading中的Cascade(和MapReduceFlow)(http://www.cascading.org/)。未来版本将支持Riffle注释,但现在使用原始MR JobConf作业效果很好。
这种方法的变体是根本不手动管理MR作业,而是使用Cascading API开发应用程序。然后,JobConf和作业链接由Cascading规划器和Flow类在内部处理。
这样,您就可以将时间集中在解决问题上,而不是在管理Hadoop作业等机制上。您甚至可以在其上添加不同的语言(如clojure或jruby),以进一步简化开发和应用程序。http://www.cascading.org/modules.html

6

您可以按照代码中提供的方式运行MR链。



请注意:仅提供了驱动程序代码。

public class WordCountSorting {
// here the word keys shall be sorted
      //let us write the wordcount logic first

      public static void main(String[] args)throws IOException,InterruptedException,ClassNotFoundException {
            //THE DRIVER CODE FOR MR CHAIN
            Configuration conf1=new Configuration();
            Job j1=Job.getInstance(conf1);
            j1.setJarByClass(WordCountSorting.class);
            j1.setMapperClass(MyMapper.class);
            j1.setReducerClass(MyReducer.class);

            j1.setMapOutputKeyClass(Text.class);
            j1.setMapOutputValueClass(IntWritable.class);
            j1.setOutputKeyClass(LongWritable.class);
            j1.setOutputValueClass(Text.class);
            Path outputPath=new Path("FirstMapper");
            FileInputFormat.addInputPath(j1,new Path(args[0]));
                  FileOutputFormat.setOutputPath(j1,outputPath);
                  outputPath.getFileSystem(conf1).delete(outputPath);
            j1.waitForCompletion(true);
                  Configuration conf2=new Configuration();
                  Job j2=Job.getInstance(conf2);
                  j2.setJarByClass(WordCountSorting.class);
                  j2.setMapperClass(MyMapper2.class);
                  j2.setNumReduceTasks(0);
                  j2.setOutputKeyClass(Text.class);
                  j2.setOutputValueClass(IntWritable.class);
                  Path outputPath1=new Path(args[1]);
                  FileInputFormat.addInputPath(j2, outputPath);
                  FileOutputFormat.setOutputPath(j2, outputPath1);
                  outputPath1.getFileSystem(conf2).delete(outputPath1, true);
                  System.exit(j2.waitForCompletion(true)?0:1);
      }

}

序列是

(JOB1)MAP->REDUCE-> (JOB2)MAP
这样做是为了对键进行排序,但还有其他方法,比如使用treemap
然而,我想把你的注意力集中在作业链的方式上!!
谢谢


4

3

3
我们可以利用作业的waitForCompletion(true)方法来定义作业之间的依赖关系。
在我的场景中,我有3个相互依赖的作业。在驱动程序类中,我使用了下面的代码,并且它按预期工作。
public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub

        CCJobExecution ccJobExecution = new CCJobExecution();

        Job distanceTimeFraudJob = ccJobExecution.configureDistanceTimeFraud(new Configuration(),args[0], args[1]);
        Job spendingFraudJob = ccJobExecution.configureSpendingFraud(new Configuration(),args[0], args[1]);
        Job locationFraudJob = ccJobExecution.configureLocationFraud(new Configuration(),args[0], args[1]);

        System.out.println("****************Started Executing distanceTimeFraudJob ================");
        distanceTimeFraudJob.submit();
        if(distanceTimeFraudJob.waitForCompletion(true))
        {
            System.out.println("=================Completed DistanceTimeFraudJob================= ");
            System.out.println("=================Started Executing spendingFraudJob ================");
            spendingFraudJob.submit();
            if(spendingFraudJob.waitForCompletion(true))
            {
                System.out.println("=================Completed spendingFraudJob================= ");
                System.out.println("=================Started locationFraudJob================= ");
                locationFraudJob.submit();
                if(locationFraudJob.waitForCompletion(true))
                {
                    System.out.println("=================Completed locationFraudJob=================");
                }
            }
        }
    }

你的回答是关于如何在执行方面加入这些工作的。原始问题是关于最好的数据结构。因此,你的回答对于这个特定的问题不相关。 - Niels Basjes

2
新的类 org.apache.hadoop.mapreduce.lib.chain.ChainMapper 可以帮助实现这种场景。

1
答案很好 - 但您应该添加更多关于它的详细信息,或者至少提供一个API参考链接,以便人们可以点赞。 - Jeremy Hajek
ChainMapper和ChainReducer用于在Reduce之前有1个或多个Mapper,并在Reduce之后有0个或多个Mapper,规范为(Mapper+)Reduce(Mapper*)。如果我理解不正确,请纠正我,但我认为这种方法不能像OP要求的那样串行链接作业。 - oczkoisse

1
如果您想以编程方式链接作业,您需要使用JobControl。使用方法非常简单:
JobControl jobControl = new JobControl(name);

之后您需要添加ControlledJob实例。ControlledJob定义了一个带有依赖关系的作业,从而自动将输入和输出插入到一系列作业中。

    jobControl.add(new ControlledJob(job, Arrays.asList(controlledjob1, controlledjob2));

    jobControl.run();

开始链。您需要将其放入单独的线程中。这样可以在运行时检查您的链的状态:

    while (!jobControl.allFinished()) {
        System.out.println("Jobs in waiting state: " + jobControl.getWaitingJobList().size());
        System.out.println("Jobs in ready state: " + jobControl.getReadyJobsList().size());
        System.out.println("Jobs in running state: " + jobControl.getRunningJobList().size());
        List<ControlledJob> successfulJobList = jobControl.getSuccessfulJobList();
        System.out.println("Jobs in success state: " + successfulJobList.size());
        List<ControlledJob> failedJobList = jobControl.getFailedJobList();
        System.out.println("Jobs in failed state: " + failedJobList.size());
    }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接