为什么Spark的reduceByKey结果不一致

4

我正在尝试使用Scala通过Spark计算每行迭代的次数。
以下是我的输入:

1 vikram
2 sachin
3 shobit
4 alok
5 akul
5 akul
1 vikram
1 vikram
3 shobit
10 ashu
5 akul
1 vikram
2 sachin
7 vikram

现在我创建了两个单独的RDD,如下所示。

val f1 = sc.textFile("hdfs:///path to above data file")
val m1 = f1.map( s => (s.split(" ")(0),1) ) //creating a tuple (key,1)
//now if i create a RDD as
val rd1 = m1.reduceByKey((a,b) => a+b )
rd1.collect().foreach(println)
//I get a proper output i.e (it gives correct output every time)
//output: (4,1) (2,2) (7,1) (5,3) (3,2) (1,4) (10,1)

//but if i create a RDD as
val rd2 = m1.reduceByKey((a,b) => a+1 )
rd2.collect().foreach(println)
//I get a inconsistent result i.e some times i get this (WRONG)
//output: (4,1) (2,2) (7,1) (5,2) (3,2) (1,2) (10,1)
//and sometimes I get this as output (CORRECT)
//output: (4,1) (2,2) (7,1) (5,3) (3,2) (1,4) (10,1) 

我不明白为什么会发生这种情况以及何时使用哪种方法。我也尝试将RDD创建为
val m2 = f1.map(s => (s,1))
val rd3 = m2.reduceByKey((a,b) => a+1 )
// Then also same issue occurs with a+1 but every thing works fine with a+b
2个回答

8
reduceByKey 假设传入的函数是 可交换的可结合的 (正如文档明确说明的)。并且 - 你的第一个函数 (a, b) => a + b 是可交换和可结合的,但是 (a, b) => a+1 不是。 为什么? 首先,reduceByKey 将提供的函数应用于每个分区,然后再应用于所有分区的组合结果。换句话说,b 并不总是 1,因此使用 a+1 是不正确的。
想象一下以下情况-输入包含4条记录,分成两个分区:
(aa, 1)
(aa, 1)

(aa, 1)
(cc, 1)

在这个输入上执行reduceByKey(f)可能会被计算如下:
val intermediate1 = f((aa, 1), (aa, 1)) 
val intermediate2 = f((aa, 1), (cc, 1))

val result = f(intermediate2, intermediate1)

现在,让我们跟随这个例子:f = (a, b) => a + b
val intermediate1 = f((aa, 1), (aa, 1))       // (aa, 2)
val intermediate2 = f((aa, 1), (cc, 1))       // (aa, 1), (cc, 1)

val result = f(intermediate2, intermediate1)  // (aa, 3), (cc, 1)

使用 f = (a, b) => a + 1

val intermediate1 = f((aa, 1), (bb, 1))       // (aa, 2)
val intermediate2 = f((aa, 1), (cc, 1))       // (aa, 1), (cc, 1)

// this is where it goes wrong:
val result = f(intermediate2, intermediate1)  // (aa, 2), (cc, 1)

最重要的是,中间计算的顺序不能保证,在执行过程中可能会发生改变,对于不可交换函数的后一种情况,这意味着结果有时会出错。


非常感谢Tzach的澄清,但这应该每次都成立,对吗? 为什么有时候我会得到正确的结果? - Vikram Singh Chandel
1
@Tzach 我觉得这里的第二种情况会产生相同的结果。如果在最后一步发生了 val result = f(intermediate2, intermediate1),则会得到不同的结果,即 //(aa, 2), (cc, 1)。如果我理解有误,请告诉我。 - Rakesh Rakshit
当中间变量1(在分区中称为a)和中间变量2(在分区中称为b)到达驱动程序时,它们可能会被洗牌。中间变量1变成b,中间变量2变成a? 因此,代替 val result = f(intermediate1, intermediate2) 它变成了 val result = f(intermediate2, intermediate1) 因此,1+1==2而不是2+1==3。 - Vikram Singh Chandel
两个评论都是正确的 - 对于可交换性和结合性的需求源于顺序不能保证的事实。我会相应地更新答案 - 弄错了例子 :) - Tzach Zohar

3

函数 (a, b) => (a + 1) 在本质上不是可结合的。 可结合律规定:

f(a ,f(b , c)) = f(f(a , b), c) 

假设以下键:
a = (x, 1)
b = (x, 1)
c = (x, 1)

应用函数 (a, b) => (a + 1)
f(a ,f(b , c)) = (x , 2)

但是,

f(f(a , b), c) = (x , 3)

因此,它不是可结合的,也不适用于reduceByKey。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接