使用Map Reduce实现最小值、最大值和计数

3
我已经开发了一个MapReduce应用程序,根据Donald Miner写的书,确定用户首次和最后一次评论以及该用户的总评论数。但是我的算法问题在于Reducer。我已经根据用户ID对评论进行了分组。我的测试数据包含两个用户ID,每个ID在不同日期发布了3条评论,因此总共有6行。因此,我的Reducer输出应该打印两个记录,每个记录显示用户首次和最后一次评论以及每个用户的总评论数。但是,我的Reducer正在打印六个记录。有人能指出以下代码有什么问题吗?
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.arjun.mapreduce.patterns.mapreducepatterns.MRDPUtils;

import com.sun.el.parser.ParseException;

public class MinMaxCount {

    public static class MinMaxCountMapper extends 
            Mapper<Object, Text, Text, MinMaxCountTuple> {

        private Text outuserId = new Text();
        private MinMaxCountTuple outTuple = new MinMaxCountTuple();

        private final static SimpleDateFormat sdf = 
                     new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSS");

        @Override
        protected void map(Object key, Text value,
                org.apache.hadoop.mapreduce.Mapper.Context context)
                throws IOException, InterruptedException {

            Map<String, String> parsed = 
                     MRDPUtils.transformXMLtoMap(value.toString());

            String date = parsed.get("CreationDate");
            String userId = parsed.get("UserId");

            try {
                Date creationDate = sdf.parse(date);
                outTuple.setMin(creationDate);
                outTuple.setMax(creationDate);
            } catch (java.text.ParseException e) {
                System.err.println("Unable to parse Date in XML");
                System.exit(3);
            }

            outTuple.setCount(1);
            outuserId.set(userId);

            context.write(outuserId, outTuple);

        }

    }

    public static class MinMaxCountReducer extends 
            Reducer<Text, MinMaxCountTuple, Text, MinMaxCountTuple> {

        private MinMaxCountTuple result = new MinMaxCountTuple();


        protected void reduce(Text userId, Iterable<MinMaxCountTuple> values,
                org.apache.hadoop.mapreduce.Reducer.Context context)
                throws IOException, InterruptedException {

            result.setMin(null);
            result.setMax(null);
            result.setCount(0);
            int sum = 0;
            int count = 0;
            for(MinMaxCountTuple tuple: values )
            {
                if(result.getMin() == null || 
                        tuple.getMin().compareTo(result.getMin()) < 0) 
                {
                    result.setMin(tuple.getMin());
                }

                if(result.getMax() == null ||
                        tuple.getMax().compareTo(result.getMax()) > 0)  {
                    result.setMax(tuple.getMax());
                }

                System.err.println(count++);

                sum += tuple.getCount();
            }

            result.setCount(sum);
            context.write(userId, result);
        }

    }

    /**
     * @param args
     */
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String [] otherArgs = new GenericOptionsParser(conf, args)
                            .getRemainingArgs();
        if(otherArgs.length < 2 )
        {
            System.err.println("Usage MinMaxCout input output");
            System.exit(2);
        }


        Job job = new Job(conf, "Summarization min max count");
        job.setJarByClass(MinMaxCount.class);
        job.setMapperClass(MinMaxCountMapper.class);
        //job.setCombinerClass(MinMaxCountReducer.class);
        job.setReducerClass(MinMaxCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(MinMaxCountTuple.class);

        FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        boolean result = job.waitForCompletion(true);
        if(result)
        {
            System.exit(0);
        }else {
            System.exit(1);
        }

    }

}

Input: 
<row Id="8189677" PostId="6881722" Text="Have you looked at Hadoop?" CreationDate="2011-07-30T07:29:33.343" UserId="831878" />
<row Id="8189677" PostId="6881722" Text="Have you looked at Hadoop?" CreationDate="2011-08-01T07:29:33.343" UserId="831878" />
<row Id="8189677" PostId="6881722" Text="Have you looked at Hadoop?" CreationDate="2011-08-02T07:29:33.343" UserId="831878" />
<row Id="8189678" PostId="6881722" Text="Have you looked at Hadoop?" CreationDate="2011-06-30T07:29:33.343" UserId="931878" />
<row Id="8189678" PostId="6881722" Text="Have you looked at Hadoop?" CreationDate="2011-07-01T07:29:33.343" UserId="931878" />
<row Id="8189678" PostId="6881722" Text="Have you looked at Hadoop?" CreationDate="2011-08-02T07:29:33.343" UserId="931878" />

output file contents part-r-00000:

831878  2011-07-30T07:29:33.343 2011-07-30T07:29:33.343 1
831878  2011-08-01T07:29:33.343 2011-08-01T07:29:33.343 1
831878  2011-08-02T07:29:33.343 2011-08-02T07:29:33.343 1
931878  2011-06-30T07:29:33.343 2011-06-30T07:29:33.343 1
931878  2011-07-01T07:29:33.343 2011-07-01T07:29:33.343 1
931878  2011-08-02T07:29:33.343 2011-08-02T07:29:33.343 1

job submission output:


12/12/16 11:13:52 INFO input.FileInputFormat: Total input paths to process : 1
12/12/16 11:13:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
12/12/16 11:13:52 WARN snappy.LoadSnappy: Snappy native library not loaded
12/12/16 11:13:52 INFO mapred.JobClient: Running job: job_201212161107_0001
12/12/16 11:13:53 INFO mapred.JobClient:  map 0% reduce 0%
12/12/16 11:14:06 INFO mapred.JobClient:  map 100% reduce 0%
12/12/16 11:14:18 INFO mapred.JobClient:  map 100% reduce 100%
12/12/16 11:14:23 INFO mapred.JobClient: Job complete: job_201212161107_0001
12/12/16 11:14:23 INFO mapred.JobClient: Counters: 26
12/12/16 11:14:23 INFO mapred.JobClient:   Job Counters 
12/12/16 11:14:23 INFO mapred.JobClient:     Launched reduce tasks=1
12/12/16 11:14:23 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=12264
12/12/16 11:14:23 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
12/12/16 11:14:23 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
12/12/16 11:14:23 INFO mapred.JobClient:     Launched map tasks=1
12/12/16 11:14:23 INFO mapred.JobClient:     Data-local map tasks=1
12/12/16 11:14:23 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=10124
12/12/16 11:14:23 INFO mapred.JobClient:   File Output Format Counters 
12/12/16 11:14:23 INFO mapred.JobClient:     Bytes Written=342
12/12/16 11:14:23 INFO mapred.JobClient:   FileSystemCounters
12/12/16 11:14:23 INFO mapred.JobClient:     FILE_BYTES_READ=204
12/12/16 11:14:23 INFO mapred.JobClient:     HDFS_BYTES_READ=888
12/12/16 11:14:23 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=43479
12/12/16 11:14:23 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=342
12/12/16 11:14:23 INFO mapred.JobClient:   File Input Format Counters 
12/12/16 11:14:23 INFO mapred.JobClient:     Bytes Read=761
12/12/16 11:14:23 INFO mapred.JobClient:   Map-Reduce Framework
12/12/16 11:14:23 INFO mapred.JobClient:     Map output materialized bytes=204
12/12/16 11:14:23 INFO mapred.JobClient:     Map input records=6
12/12/16 11:14:23 INFO mapred.JobClient:     Reduce shuffle bytes=0
12/12/16 11:14:23 INFO mapred.JobClient:     Spilled Records=12
12/12/16 11:14:23 INFO mapred.JobClient:     Map output bytes=186
12/12/16 11:14:23 INFO mapred.JobClient:     Total committed heap usage (bytes)=269619200
12/12/16 11:14:23 INFO mapred.JobClient:     Combine input records=0
12/12/16 11:14:23 INFO mapred.JobClient:     SPLIT_RAW_BYTES=127
12/12/16 11:14:23 INFO mapred.JobClient:     Reduce input records=6
12/12/16 11:14:23 INFO mapred.JobClient:     Reduce input groups=2
12/12/16 11:14:23 INFO mapred.JobClient:     Combine output records=0
12/12/16 11:14:23 INFO mapred.JobClient:     Reduce output records=6
12/12/16 11:14:23 INFO mapred.JobClient:     Map output records=6

1
你能否将你正在使用的输入数据发布到原始问题中(而不是评论)? - Chris White
https://github.com/adamjshook/mapreducepatterns/blob/master/MRDP/src/main/java/mrdp/ch2/MinMaxCountDriver.java <--- 原始代码,如有兴趣可查看 - Donald Miner
谢谢Donald Miner。是的,我确实在跟随您的书籍,上面的配方也来自您的书籍。这是我为工作提供的样本数据。 - user1207659
从您提供的输入中,它应该只输出1条记录,因为所有记录都具有相同的用户ID(931878)。 - Chris White
1
为了扩展我之前的评论,请在reduce方法中添加@Override扩展(就像您在mapper类中所做的那样),以确保reducer覆盖父类。 - Adam Shook
显示剩余8条评论
1个回答

4
啊,找到了罪犯,只需将reduce方法的签名更改为以下内容: protected void reduce(Text userId, Iterable<MinMaxCountTuple> values, Context context) throws IOException, InterruptedException { 基本上只需要使用`Context`,而不是`org.apache.hadoop.mapreduce.Reducer.Context`。
现在输出看起来像这样:
831878  2011-07-30T07:29:33.343 2011-08-02T07:29:33.343 3
931878  2011-06-30T07:29:33.343 2011-08-02T07:29:33.343 3

我为您在本地测试了一下,这个改变解决了问题。虽然这是一个奇怪的行为,如果有人能给出解释就太好了。这与泛型有关。当使用org.apache.hadoop.mapreduce.Reducer.Context时,它会显示:
"Reducer.Context is a raw type. References to generic type Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>.Context should be parameterized"

但是当只使用“上下文”时,它是可以的。


为什么会这样?使用 org.apache.hadoop.mapreduce.Reducer.Contextcontext 有什么区别?我也非常好奇.... - LazerSharks

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接