PySpark的groupByKey返回pyspark.resultiterable.ResultIterable。

62

我正在努力弄清楚为什么我的groupByKey返回以下内容:

[(0, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a210>), (1, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a4d0>), (2, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a390>), (3, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a290>), (4, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a450>), (5, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a350>), (6, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a1d0>), (7, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a490>), (8, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a050>), (9, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a650>)]

我已经使用了 flatMap 处理类似这样的值:

[(0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D')]

我只是在做一件简单的事:

groupRDD = columnRDD.groupByKey()
6个回答

85
你得到的是一个对象,它允许你迭代结果。通过在值上调用list(),你可以将groupByKey的结果转换为列表,例如:

你得到的是一个对象,它允许你迭代结果。通过在值上调用list(),你可以将groupByKey的结果转换为列表,例如:

example = sc.parallelize([(0, u'D'), (0, u'D'), (1, u'E'), (2, u'F')])

example.groupByKey().collect()
# Gives [(0, <pyspark.resultiterable.ResultIterable object ......]

example.groupByKey().map(lambda x : (x[0], list(x[1]))).collect()
# Gives [(0, [u'D', u'D']), (1, [u'E']), (2, [u'F'])]

40
example.groupByKey().mapValues(list).collect() 更简洁并且也有效。 - Charity Leschinski
5
我该如何遍历 ResultIterable 类型? - xxx222

31

你也可以使用

example.groupByKey().mapValues(list)

1

例子:

r1 = sc.parallelize([('a',1),('b',2)])
r2 = sc.parallelize([('b',1),('d',2)])
r1.cogroup(r2).mapValues(lambda x:tuple(reduce(add,__builtin__.map(list,x))))

结果:

[('d', (2,)), ('b', (2, 1)), ('a', (1,))]

1

建议您使用cogroup()而不是groupByKey()。您可以参考以下示例。

[(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]

例子:

>>> x = sc.parallelize([("foo", 1), ("bar", 4)])
>>> y = sc.parallelize([("foo", -1)])
>>> z = [(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
>>> print(z)

你应该得到期望的输出...


1
除了以上回答,如果您想要排序后的唯一项目列表,请使用以下内容:

不同且排序值列表

example.groupByKey().mapValues(set).mapValues(sorted)

仅列出排序后的值
example.groupByKey().mapValues(sorted)

替代上述内容
# List of distinct sorted items
example.groupByKey().map(lambda x: (x[0], sorted(set(x[1]))))

# just sorted list of items
example.groupByKey().map(lambda x: (x[0], sorted(x[1])))

0

假设你的代码是...

ex2 = ex1.groupByKey()

然后你运行...

ex2.take(5)

你将会看到一个可迭代对象。如果你要对这些数据做一些操作的话,那么这样就没问题了,你可以直接继续。但是,如果你只想先打印/查看值再继续进行,这里有一个小技巧。

ex2.toDF().show(20, False)

或者只是

ex2.toDF().show()

这将显示数据的值。您不应该使用collect(),因为它会将数据返回给驱动程序,如果您正在处理大量数据,那么这将使您的程序崩溃。现在,如果ex2 = ex1.groupByKey()是您的最后一步,并且您想要返回这些结果,则可以使用collect(),但请确保您知道返回的数据量很小。

print(ex2.collect())

这是关于在RDD上使用collect()的另一篇不错的文章。

查看Python Spark中的RDD内容?


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接