使用pySpark迭代Data Frame的每一行

3
我需要使用pySpark遍历一个数据框就像我们可以使用for循环遍历一组值一样。以下是我编写的代码。这个代码的问题是:
  1. 我必须使用collect破坏了并行性
  2. 我无法在函数funcRowIter中打印任何DataFrame中的值
  3. 一旦找到匹配,我就不能打破循环。
我必须在pySpark中完成这个任务,不能使用pandas:
from pyspark.sql.functions import *
from pyspark.sql import HiveContext
from pyspark.sql import functions
from pyspark.sql import DataFrameWriter
from pyspark.sql.readwriter import DataFrameWriter
from pyspark import SparkContext

sc = SparkContext()
hive_context = HiveContext(sc)

tab = hive_context.sql("select * from update_poc.test_table_a")

tab.registerTempTable("tab")
print type(tab)

df = tab.rdd

def funcRowIter(rows):
    print type(rows)
        if(rows.id == "1"):
            return 1

df_1 = df.map(funcRowIter).collect()
print df_1
2个回答

4

不要使用df.map(funcRowIter).collect(),而是尝试使用UDF。希望这能帮到你。

from pyspark.sql.functions import struct
from pyspark.sql.functions import *
def funcRowIter(rows):
    print type(rows)
    if(row is nor None and row.id is not None)
        if(rows.id == "1"):
            return 1
A = udf(funcRowIter, ArrayType(StringType()))
z = df.withColumn(data_id, A(struct([df[x] for x in df.columns])))
z.show()

collect()不适合处理非常大的数据,例如数百万条记录。


0

看起来你的目标是显示特定行。你可以使用.filter然后使用.collect

例如,

row_1 = rdd.filter(lambda x: x.id==1).collect()

然而,尝试这种迭代方式遍历数据框将不会是高效的。


我正在尝试不使用collect()来完成这个操作,因为collect()会破坏并行性。 - Ashay Dhavale

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接