使用pySpark将mapPartitions的结果转换为spark DataFrame

5

我有一份工作需要在分区的spark dataframe上运行,并且该过程如下:

rdd = sp_df.repartition(n_partitions, partition_key).rdd.mapPartitions(lambda x: some_function(x))

结果是一个含有pandas.dataframerdd
type(rdd) => pyspark.rdd.PipelinedRDD
type(rdd.collect()[0]) => pandas.core.frame.DataFrame

rdd.glom().collect() 的返回结果如下:

[[df1], [df2], ...]

现在我希望将结果转换为Spark数据框,我的做法是:
sp = None
for i, partition in enumerate(rdd.collect()):
    if i == 0:
        sp = spark.createDataFrame(partition)
    else:
        sp = sp.union(spark.createDataFrame(partition))

return sp


然而,结果可能会很大,rdd.collect()可能会超出驱动程序的内存限制,因此我需要避免使用collect()操作。有什么方法可以解决这个问题吗?
提前感谢!

1
你可以直接运行 rdd.toDF()。或者,spark.createDataFrame(rdd) - samkart
@samakart,不是很行,它会导致错误 ValueError: The truth value of a DataFrame is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all(). ,我猜它只适用于 Row - MJeremy
@dre-hh 这是你要找的吗?(https://medium.com/@mrpowers/adding-structtype-columns-to-spark-dataframes-b44125409803) - samkart
是的和不是的:)所以,这是一种以Python类型提供模式的方法。但是,schema参数还接受更短的SQL DSL表示法。例如。 createDataFrame(x, schema="uuid_id STRING, url STRING, title STRING") 我试图在文档中找到它支持哪些类型,但那些基本上是Python数据类型在SQL符号中的类比。 - dre-hh
不要忘记提到,使用rdd创建DataFrame时,不能将pandasDataframe作为rdd的值。该值必须是受支持的纯Python数据结构。例如,您可以使用rdd.Row类型的列表。 请参阅https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=createdataframe#pyspark.sql.SQLContext.createDataFrame - dre-hh
显示剩余2条评论
3个回答

3
如果您想要继续使用RDD API,mapPartitions需要接受一个类型的迭代器,并期望返回另一种类型的迭代器作为结果。pandas_df不是mapPartitions可以直接处理的迭代器类型。如果您必须使用pandas API,可以从pandas.iterrows创建一个适当的生成器。
这样,您整体的mapPartitions结果将是一种行类型的单个RDD,而不是Pandas数据帧的RDD。这种RDD可以轻松地通过即时模式发现架构转换回数据帧。
from pyspark.sql import Row

def some_fuction(iter):
  pandas_df = some_pandas_result(iter)
  for index, row in pandas_df.iterrows():
     yield Row(id=index, foo=row['foo'], bar=row['bar'])


rdd = sp_df.repartition(n_partitions, partition_key).rdd.mapPartitions(lambda x: some_function(x))
df = spark.createDataFrame(rdd)

所以这些方法需要事先知道生成的 Pandas 数据框的列吗? - MJeremy
2
感谢您花时间深入研究这个问题。我通过您的建议解决了它:将其转换为“行”然后“创建数据帧”。我应用的代码是将pandas数据框的每一行附加到Row对象列表中: row_list.append(Row(**row_dict)) - MJeremy
很好,它也可以像这样动态工作。我不确定。有一天也可以尝试使用pandas udf(下面的答案)。缺点是需要指定正确的行类型。但它将使用apache airflow,使用现代CPU SIMD指令,并且从pandas到dataframe的转换将在更优化的代码背后发生。 - dre-hh
做得好,我会记下 row_list.append(Row(**row_dict)) - WestCoastProjects

2
您可以直接在数据框上使用新的pandas 分组UDF,而不是使用rdd.mapPartitions。该函数本身接受一个 pandas 数据框作为组,并返回 pandas 数据框。
当它与 Spark 数据框应用 API 一起使用时,Spark 会自动将分区的 pandas 数据框组合成一个新的 Spark 数据框。
# a grouped pandas_udf receives the whole group as a pandas dataframe
# it must also return a pandas dataframe
# the first schema string parameter must describe the return dataframe schema

# in this example the result dataframe contains 2 columns id and value
@pandas_udf("id long, value double", PandasUDFType.GROUPED_MAP)
def some_function(pdf):
    return pdf.apply(some_pdf_func)

df.groupby(df.partition_key).apply(some_function).show()

有趣但实际上不如@MJeremy的方法多才多艺,他使用row_list.append(Row(**row_dict))来处理任意列长度和类型。 - WestCoastProjects

-1

你可以做:

sp = None 
def f(x):
 sp = spark.createDataFrame(x)
 return (sp)
sp = sp.union(rdd.foreach(f))

参考:

Spark SQL DataFrame

Spark RDD

如果有效,请点赞。

嗨,感谢您的深入挖掘。恐怕它不起作用。sp=None => 'NoneType' object has no attribute 'union'foreach => SparkContext只能在驱动程序上使用,而不能在运行在工作节点上的代码中使用 - MJeremy

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接