如何在pyspark中循环遍历每一行数据框

82

例如

sqlContext = SQLContext(sc)

sample=sqlContext.sql("select Name ,age ,city from user")
sample.show()

上述语句在终端上打印整个表格。但我想使用forwhile访问该表中的每一行以执行进一步的计算。


我相信我提供了正确的答案。您能选择或提供反馈以改进吗? - aaronsteers
7个回答

78

你无法直接进行迭代。与其他分布式数据结构一样,DataFrames不是可迭代的,只能使用专门的高阶函数和/或SQL方法访问。

当然,你可以使用collect

for row in df.rdd.collect():
    do_something(row)

或将toLocalIterator转换
for row in df.rdd.toLocalIterator():
    do_something(row)

虽然可以像上面展示的一样在本地进行迭代,但这打败了使用Spark的所有目的。


1
新手问题:由于迭代已经收集的数据框“打败了目的”,从数据框中,我应该如何挑选出需要进一步处理的行? - Jari Turkia
2
做了一些阅读,看起来使用 where() 形成一个新的数据帧将是正确的 Spark 方法。 - Jari Turkia
3
“it beats all purpose of using Spark” 是相当强烈和主观的措辞。collect() 方法是有存在的意义的,而且有许多有效的使用情况。一旦 Spark 处理完数据,遍历最终结果可能是与外部 API 或传统系统集成/写入的唯一方式。 - Marco Roy

70
为了“循环”并利用Spark的并行计算框架,您可以定义一个自定义函数并使用map。
def customFunction(row):

   return (row.name, row.age, row.city)

sample2 = sample.rdd.map(customFunction)
或者
sample2 = sample.rdd.map(lambda x: (x.name, x.age, x.city))

自定义函数将应用于数据框的每一行。请注意,sample2将是一个RDD,而不是数据框。

如果您要执行更复杂的计算,则可能需要使用Map。如果您只需要添加一个简单的派生列,则可以使用返回数据框的withColumn

sample3 = sample.withColumn('age2', sample.age + 2)

请问您能告诉我如何实际使用customFunction,以便返回值可以在循环内用于进一步处理吗?我有一个基于collect()的方法,但我的数据太大了,导致Pyspark(v. 3)失败。谢谢! - IrfanClemson
嗨 @David,如果我在RDD上使用map(),每一行是否会按顺序运行customFunction()?在我的情况下,我希望每一行都能按顺序进行处理。 - undefined

14
在Python中使用列表推导式,你可以仅用两行代码将整个列的值收集到一个列表中:

使用Python中的列表推导式,您只需要两行代码就可以将整个列的值收集到一个列表中:

df = sqlContext.sql("show tables in default")
tableList = [x["tableName"] for x in df.rdd.collect()]
在上面的示例中,我们返回数据库'default'中表的列表,但是可以通过替换在sql()中使用的查询来进行调整。
或者更简短地说:
tableList = [x["tableName"] for x in sqlContext.sql("show tables in default").rdd.collect()]

对于您提出的三列示例,我们可以创建一个字典列表,然后在for循环中对其进行迭代。

sql_text = "select name, age, city from user"
tupleList = [{name:x["name"], age:x["age"], city:x["city"]} 
             for x in sqlContext.sql(sql_text).rdd.collect()]
for row in tupleList:
    print("{} is a {} year old from {}".format(
        row["name"],
        row["age"],
        row["city"]))

9

Give A Try Like this

    result = spark.createDataFrame([('SpeciesId','int'), ('SpeciesName','string')],["col_name", "data_type"]); 
    for f in result.collect(): 
        print (f.col_name)

8

这可能不是最好的做法,但您可以使用collect()直接定位到特定列并将其导出为行列表,然后遍历该列表。

假设这是您的数据框:

+----------+----------+-------------------+-----------+-----------+------------------+ 
|      Date|  New_Date|      New_Timestamp|date_sub_10|date_add_10|time_diff_from_now|
+----------+----------+-------------------+-----------+-----------+------------------+ 
|2020-09-23|2020-09-23|2020-09-23 00:00:00| 2020-09-13| 2020-10-03| 51148            | 
|2020-09-24|2020-09-24|2020-09-24 00:00:00| 2020-09-14| 2020-10-04| -35252           |
|2020-01-25|2020-01-25|2020-01-25 00:00:00| 2020-01-15| 2020-02-04| 20963548         |
|2020-01-11|2020-01-11|2020-01-11 00:00:00| 2020-01-01| 2020-01-21| 22173148         |
+----------+----------+-------------------+-----------+-----------+------------------+

循环遍历日期列中的行:

rows = df3.select('Date').collect()

final_list = []
for i in rows:
    final_list.append(i[0])

print(final_list)

2
如果你想对DataFrame对象中的每一行进行操作,可以使用map。这将允许你在每一行上执行进一步的计算。它相当于从0len(dataset)-1循环遍历整个数据集。
请注意,这将返回一个PipelinedRDD,而不是一个DataFrame。

1
以上
tupleList = [{name:x["name"], age:x["age"], city:x["city"]} 

应该是


tupleList = [{'name':x["name"], 'age':x["age"], 'city':x["city"]} 

对于name、age和city,它们不是变量,而只是字典中的键。

代码第二行右侧缺少一个方括号吗? - Geoffrey Anderson
1
当你没有回答原始问题时,请不要将其作为答案发布,而是更倾向于在评论中提问或建议对部分正确的答案进行编辑。 - Aniruddha Kalburgi

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接