Apache Spark - reducebyKey - Java -

5

我正在尝试使用Java作为编程语言,了解Spark中reduceByKey的工作原理。

假设我有一个句子“我是谁,我就是我”。 我将该句子分解为单词并将其存储为列表[I, am, who, I, am]

现在,该函数将每个单词分配为1

JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
    @Override
    public Tuple2<String, Integer> call(String s) {
        return new Tuple2<String, Integer>(s, 1);
    }
});

因此,输出结果如下:
(I,1) 
(am,1)
(who,1)
(I,1)
(am,1)

现在,如果我有三个reducer运行,每个reducer都会获得一个键和与该键关联的值:
reducer 1:
    (I,1)
    (I,1)

reducer 2:
    (am,1)
    (am,1)

reducer 3:
    (who,1)

我希望了解:

a. 下面的函数到底发生了什么。
b. 参数new Function2<Integer, Integer, Integer>是什么意思。
c. JavaPairRDD 是如何形成的。

JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer i1, Integer i2) {
        return i1 + i2;
    }
});

你的问题需要更具体,当你说“到底发生了什么”时,很难知道你想要什么样的解释。此外,你应该查看Scala API,它比Java API更简洁、更好用。你的代码可以变成:words.map((_, 1)).reduceByKey(_ + _) - samthebest
4个回答

6
我认为你的问题围绕在这里的reduce函数,它是一个返回1的2个参数函数,而在Reducer中,您实现的是一个多对多的函数。
这个API更简单,但不太通用。您提供了一个可以将任何2个值缩减到1的结合操作(例如,两个整数相加得到一个)。这用于将每个键的所有值缩减为1。提供N到1的函数并不必要,因为可以使用2到1的函数来完成。在这里,您不能为一个键发出多个值。
结果是每个(键,一堆值)的(键,缩减值)。
经典Hadoop MapReduce中的Mapper和Reducer实际上非常相似(只是一个接收值集合而不是键值对的值),并且让您实现许多模式。从某种意义上说,这是好的,从某种意义上说,这是浪费和复杂的。
您仍然可以重现Mappers和Reducers的工作,但Spark中的方法是mapPartitions,可能与groupByKey配对使用。这些是您可能考虑的最通用的操作,并且我并不是说您应该以这种方式在Spark中模仿MapReduce。实际上,这不太可能有效。但是这是可能的。

6
reduceByKey 的工作原理如下:
在一个 RDD 中,如果 Spark 找到具有相同键的元素,则 Spark 获取它们的值并对这些值执行某些操作,然后返回相同类型的值。例如,假设您有一个包含以下元素的 RDD:
[k,V1],[K,V2],其中 V1,V2 是相同类型的值。
那么新 Function2() 的参数可以有三个:
1.来自第一个 K,V 对的值部分,即 V1。 2.来自第二个 K,V 对的值部分,即 V2。 3.重写的 call 方法的返回类型,它再次是 V1 和 V2 类型(这可以是作为 call 方法的一部分提供的函数操作的结果)。
请注意,由于 RDD 分布在节点上,每个节点将执行自己的 reduce 操作,并将结果返回给主节点,然后主节点再对工作节点的结果进行最终 reduce 操作。
我想这解释了您的疑问。

0

reduceByKey,顾名思义,会对JavaPairRDD应用一个reduce操作,其中键是相同的。如果您参考文档,它说reduceByKey

使用可结合和交换的reduce函数合并每个键的值。

reduceByKey需要Function2接口的实现。Function2的语法是:Function2<T1, T2, R>,这里,输入参数的类型为T1和T2,输出参数的类型为R。

让我们通过您提到的示例来理解这一点

您想要应用reduceByKey的JavaPairRDD是:

(I,1) 
(am,1)
(who,1)
(I,1)
(am,1)

在你的JavaPairRDD中,键是第一个参数(在这种情况下是单词),值是第二个参数(每个单词分配1)。你想要应用reduceByKey以了解每个单词出现的次数。每当我们看到相同的单词时,我们就想要将JavaPairRDD的值相加。因此,为了将值相加,您需要两个输入参数,返回值将是一个参数。
因此,语法中的前两个整数表示输入,第三个整数表示输出。与Function2接口的语法相关联,T1和T2是整数,R也是整数。
回答问题c)
最终通过应用reduceByKey操作形成的JavaPairRDD将具有原始JavaPairRDD的,其中reduceByKey被应用,并且将是在Function2接口实现中计算的最终减少值。
如果您对这些功能接口的参数感到困惑,请使用此规则:输入参数将在输出参数后面在接口的语法声明中。
输入参数/参数将在函数的括号中,输出参数将是在函数名称之前提到的参数。
例如:
  1. 查看您提出的问题中PairFunction的声明。它是PairFunction<String,String,Integer>,相应的调用方法是Tuple2<String,Integer> call(String s)。因此,这里的输入是String,输出由String和Integer组成。
  2. 查看Function2接口的声明。它是Function2<Integer,Integer,Integer>,相应的调用方法是Integer call(Integer i1,Integer i2)。因此,输入是两个整数,输出是一个整数。
希望对您有所帮助。

-3
简而言之,请考虑以下内容:
输入:{(a:1),(b:2),(c:2),(a:3),(b:2),(c:3)} 将其传递给reduceByKey
输出:{(a:4),(b:4),(c:5)}

请问您能否详细解释一下这个回答是如何回答了问题a、b和c的呢? - Falko

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接