Pyspark RDD:查找元素的索引

6

我是Pyspark的新手,我试图将Python中的列表转换为RDD,然后使用RDD查找元素索引。对于第一部分,我正在执行以下操作:

list = [[1,2],[1,4]]
rdd = sc.parallelize(list).cache()

现在rdd实际上就是我的列表。问题是我想要找到任意元素的索引,就像Python列表中的“index”函数一样。我知道有一个叫做zipWithIndex的函数可以为每个元素分配索引,但我找不到Python的适当示例(有Java和Scala的示例)。

谢谢。


你所说的任意元素,是指像 [1,2] 这样的任意子列表吗? - Akshat Mahajan
@AkshatMahajan 是的,那就是我的意思。 - ahajib
1
不得不说明的是,在Pyspark中确实有一个zipWithIndex的示例。你可以在这里找到它。 - Akshat Mahajan
1个回答

13

使用filterzipWithIndex:

rdd.zipWithIndex().
filter(lambda (key,index) : key == [1,2]).
map(lambda (key,index) : index).collect()

注意这里的[1,2]可以很容易地更改为变量名,并且整个表达式可以包装在函数中。

它的工作原理

zipWithIndex只是简单地返回一个元组(item,index),如下所示:

rdd.zipWithIndex().collect()
> [([1, 2], 0), ([1, 4], 1)]

filter 查找符合特定标准(在本例中是 key 等于特定子列表)的内容。

rdd.zipWithIndex().filter(lambda (key,index) : key == [1,2]).collect()
> [([1, 2], 0)]

map很明显,我们可以直接得到索引:

rdd.zipWithIndex().filter(lambda (key,index) : key == [1,2]).
map(lambda (key,index): index).collect()
> [0]

如果需要的话,我们可以通过索引[0]来获取第一个元素。


问题是,当我在代码中添加“rdd.zipWithIndex.collect()”时,它返回以下错误:“AttributeError: 'function' object has no attribute 'collect'”。 - ahajib
我(指的是你)忘记了一个 ()。应该是 rdd.zipWithIndex().collect() - Akshat Mahajan
1
请注意,collect将返回一个列表。如果您有一个真正的大型数据集,请不要执行collect,而是在调用collect()之前先取样。 - Akshat Mahajan
@nimafl 更新了答案并更正了代码。感谢你指出错误 :) - Akshat Mahajan
谢谢您指出列表的问题,也感谢您的回答 :) - ahajib
提供一个lambda函数给filter()不再被支持了吗? - user1129682

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接