在Spark中使用Python计算Pairwise(K,V)RDD中每个KEY的平均值

39

我想分享这个特别的Apache Spark与Python解决方案,因为它的文档相当贫乏。

我想计算K/V对(存储在Pairwise RDD中)的平均值,按KEY分组。以下是示例数据的样子:

>>> rdd1.take(10) # Show a small sample.
[(u'2013-10-09', 7.60117302052786),
(u'2013-10-10', 9.322709163346612),
(u'2013-10-10', 28.264462809917358),
(u'2013-10-07', 9.664429530201343),
(u'2013-10-07', 12.461538461538463),
(u'2013-10-09', 20.76923076923077),
(u'2013-10-08', 11.842105263157894),
(u'2013-10-13', 32.32514177693762),
(u'2013-10-13', 26.249999999999996),
(u'2013-10-13', 10.693069306930692)]

现在以下代码序列是一种不太理想的方式,但它确实可以工作。这就是我在找到更好的解决方案之前所做的。它不是很糟糕,但--如您将在回答部分看到的那样--有一种更简洁、高效的方法。

>>> import operator
>>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}
>>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerators (i.e. the SUMs).
>>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by it's denominator (i.e. COUNT)
>>> print(rdd1.collect())
  [(u'2013-10-09', 11.235365503035176),
   (u'2013-10-07', 23.39500642456595),
   ... snip ...
  ]

如果我有NoneType值,并且我只想在aggregateByKey()函数中丢弃它们而不是之前,这是否可能? - QUEEN
4个回答

60
现在更好的做法是使用rdd.aggregateByKey()方法。因为这个方法在Apache Spark with Python文档中的说明非常不清楚(这也是我写这个问答的原因),所以直到最近我一直在使用上述代码序列。但是,这种方法效率较低,除非必要,否则避免使用这种方法。
以下是如何使用rdd.aggregateByKey()方法进行相同操作的步骤(推荐使用):
按键(KEY)同时计算SUM(我们想要计算的平均数的分子)和COUNT(我们想要计算的平均数的分母):
>>> aTuple = (0,0) # As of Python3, you can't pass a literal sequence to a function.
>>> rdd1 = rdd1.aggregateByKey(aTuple, lambda a,b: (a[0] + b,    a[1] + 1),
                                       lambda a,b: (a[0] + b[0], a[1] + b[1]))

以下是有关每个ab对的含义(以便您可以想象发生了什么):
   First lambda expression for Within-Partition Reduction Step::
   a: is a TUPLE that holds: (runningSum, runningCount).
   b: is a SCALAR that holds the next Value

   Second lambda expression for Cross-Partition Reduction Step::
   a: is a TUPLE that holds: (runningSum, runningCount).
   b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount).

最后,计算每个键的平均值,并收集结果。
>>> finalResult = rdd1.mapValues(lambda v: v[0]/v[1]).collect()
>>> print(finalResult)
      [(u'2013-09-09', 11.235365503035176),
       (u'2013-09-01', 23.39500642456595),
       (u'2013-09-03', 13.53240060820617),
       (u'2013-09-05', 13.141148418977687),
   ... snip ...
  ]

我希望这个关于aggregateByKey()的问题和答案能够帮到你。


这真的是一个很好的答案。但是需要注意的是,由于PEP 3113,这只适用于Python 2.x,因为在lambda表达式中不再支持元组解包。 - TayTay
@Tgsmith61591 谢谢。我添加了中间变量“aTuple”来解决这个问题。(叹气,我想不出更好的标识符名称,哈哈)。对PEP 3113的发现很好! - NYCeyes
"key1", (1, 1) "key1", (2, 1) => "key1", (3, 2)基于相同的a,b解释:.aggregateByKey(aTuple, lambda a, b: (a[0] + b[0], a[1] + 1), lambda a, b: (a[0] + b[0], a[1] + b[1]))这是对我有效的。 - kaushalop

13

在我看来,一个更易读的替代方法是使用两个lambda表达式的aggregateByKey:

rdd1 = rdd1 \
    .mapValues(lambda v: (v, 1)) \
    .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1]))

这样整个平均数的计算将会是:

avg_by_key = rdd1 \
    .mapValues(lambda v: (v, 1)) \
    .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])) \
    .mapValues(lambda v: v[0]/v[1]) \
    .collectAsMap()

有人能解释一下 lambda 函数中的 a 和 b 的意义吗?@pat - bobthebuilder

5

关于该问题的一种直观但不佳的解决方案,这里添加一条说明。在书籍《24小时内快速掌握Apache Spark》的最后一章中详细解释了此问题。

使用groupByKey可以轻松解决该问题,方法如下:

rdd = sc.parallelize([
        (u'2013-10-09', 10),
        (u'2013-10-09', 10),
        (u'2013-10-09', 13),
        (u'2013-10-10', 40),
        (u'2013-10-10', 45),
        (u'2013-10-10', 50)
    ])

rdd \
.groupByKey() \
.mapValues(lambda x: sum(x) / len(x)) \
.collect()

输出:

[('2013-10-10', 45.0), ('2013-10-09', 11.0)]

这很直观和吸引人,但不要使用它groupByKey 不会在映射器上进行任何组合,并将所有单个键值对带到 reducer。
尽可能避免使用 groupByKey。选择像 @pat 的 reduceByKey 解决方案。

1
感谢您的建议和参考。 - Taka

1
对prismalytics.io的答案进行了轻微改进。
可能存在一种情况,即计算总和可能会因为我们正在求和大量值而溢出数字。相反,我们可以保留平均值,并从两个部分的平均值和计数得到的减少来计算平均值。
如果您有两个平均值和计数为(a1,c1)和(a2,c2)的部分,则总体平均值为: total/counts = (total1 + total2)/ (count1 + counts2) = (a1*c1 + a2*c2)/(c1+c2)
如果我们标记R = c2 / c1,则可以进一步重写为a1 /(1 + R)+ a2 * R /(1 + R) 如果我们进一步标记Ri为1 /(1 + R),则可以将其写为a1 * Ri + a2 * R * Ri
myrdd = sc.parallelize([1.1, 2.4, 5, 6.0, 2, 3, 7, 9, 11, 13, 10])
sumcount_rdd = myrdd.map(lambda n : (n, 1))
def avg(A, B):
    R = 1.0*B[1]/A[1]
    Ri = 1.0/(1+R);
    av = A[0]*Ri + B[0]*R*Ri
    return (av, B[1] + A[1]);

(av, counts) = sumcount_rdd.reduce(avg)
print(av)

这种方法可以通过使用mapValues代替map和reduceByKey代替reduce来转换为键值对。
这是来自于:https://www.knowbigdata.com/blog/interview-questions-apache-spark-part-2

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接