PySpark reduceByKey?如何添加键/元组

10
我有以下数据,我想做的是:
[(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')]

对于每个键,计算值(一个1个字符串字符)的实例数。所以我首先做了一个映射:

.map(lambda x: (x[0], [x[1], 1]))

现在将其作为键/元组:
[(13, ['D', 1]), (14, ['T', 1]), (32, ['6', 1]), (45, ['T', 1]), (47, ['2', 1]), (48, ['0', 1]), (49, ['2', 1]), (50, ['0', 1]), (51, ['T', 1]), (53, ['2', 1]), (54, ['0', 1]), (13, ['A', 1]), (14, ['T', 1]), (32, ['6', 1]), (45, ['A', 1]), (47, ['2', 1]), (48, ['0', 1]), (49, ['2', 1]), (50, ['0', 1]), (51, ['X', 1])]

我只是最后一部分无法弄清楚如何针对每个密钥计算该字母的出现次数。例如,密钥13将有1个D和1个A。而14将有2个T等。


1
你希望先使用 groupByKey,然后在已分组的字符上执行计数。 - mattsilver
4个回答

7

我更熟悉Scala中的Spark,所以可能有比使用Counter更好的方法来计算由groupByKey生成的可迭代对象中的字符数,但这里提供一种选择:

from collections import Counter

rdd = sc.parallelize([(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')]) 
rdd.groupByKey().mapValues(Counter).collect()

[(48, Counter({'0': 2})),
 (32, Counter({'6': 2})),
 (49, Counter({'2': 2})),
 (50, Counter({'0': 2})),
 (51, Counter({'X': 1, 'T': 1})),
 (53, Counter({'2': 1})),
 (13, Counter({'A': 1, 'D': 1})),
 (45, Counter({'A': 1, 'T': 1})),
 (14, Counter({'T': 2})),
 (54, Counter({'0': 1})),
 (47, Counter({'2': 2}))]

3
哦,你已经使用了计数器!不幸的是,应该避免使用 groupByKey,因为它会将所有数据聚合在主节点上。而且,仅使用两个操作还不够。但是,对于紧凑性,我投一票! - Nikita
@ipoteka 有趣,我不知道 groupByKey 的低效性,你有详细阐述的好参考资料吗? - mattsilver
3
http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html - Nikita
不错的链接,非常棒的数据。很有道理。我怀疑有时 groupByKey 仍然足够快,但了解这一点非常好。 - mattsilver
@Nikita,它不会在“master”上聚合所有数据。但是,以非聚合形式进行洗牌并传输到执行器。这就是与reduceByKey的关键区别,后者在洗牌数据之前执行一次聚合步骤,因此(通常)在网络上传输的数据要少得多。 - Oliver W.

6

替代方案:

.map(lambda x: (x[0], [x[1], 1]))

我们可以这样做:
.map(lambda x: ((x[0], x[1]), 1))

在最后一步,我们可以使用reduceByKeyadd。请注意,add来自operator包。
将它们组合起来:
from operator import add
rdd = sc.parallelize([(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')]) 
rdd.map(lambda x: ((x[0], x[1]), 1)).reduceByKey(add).collect()

4
如果我理解正确,您可以在一个操作中使用combineByKey来实现:
from collections import Counter
x = sc.parallelize([(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')]) 
result = x.combineByKey(lambda value:  {value: 1}, 
...                     lambda x, value:  value.get(x,0) + 1,
...                     lambda x, y: dict(Counter(x) + Counter(y)))
result.collect()
[(32, {'6': 2}), (48, {'0': 2}), (49, {'2': 2}), (53, {'2': 1}), (13, {'A': 1, 'D': 1}), (45, {'A': 1, 'T': 1}), (50, {'0': 2}), (54, {'0': 1}), (14, {'T': 2}), (51, {'X': 1, 'T': 1}), (47, {'2': 2})]

@ohruunuruus 我已经编辑过了,但我不确定这个解决方案是否足够“Pythonic”。 - Nikita
1
我得到的一个错误是:AttributeError: 'str'对象没有'get'属性。 - theMadKing
哦,但是value是字典而不是字符串。我写的例子可以工作,结构与您的输入相同。无法建议出错了什么 :( - Nikita
我尝试运行你提供的代码,但它并没有按照你给出的完全一样的方式工作。 - theMadKing
每个字母都被第一个函数转换为字典。第二个函数合并字母和字典,第三个函数合并两个字典。 - Nikita
显示剩余4条评论

0

我尝试使用函数和 mapValues() 转换

def f(Counter): return Counter

from collections import Counter

rdd=sc.parallelize([(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')])
rdd.groupByKey().mapValues(Counter).collect()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接