PySpark:向DataFrame添加更多列的最佳实践

4

Spark Dataframes有一个方法withColumn,可以一次添加一个新列。要添加多个列,需要一系列的withColumn。这是最佳实践吗?

我认为使用mapPartitions有更多的优势。假设我有一个包含三个withColumn的链以及一个过滤器来根据某些条件删除Row。这是四个不同的操作(虽然我不确定是否有任何宽转换)。但如果我使用mapPartitions,我可以一次性完成所有操作。如果我有一个数据库连接,我希望每个RDD分区只打开一次。

我的问题有两个部分。

第一部分,这是我的mapPartitions实现。这种方法是否存在未预见的问题?是否有更加优雅的方法来实现?

df2 = df.rdd.mapPartitions(add_new_cols).toDF()

def add_new_cols(rows):
    db = open_db_connection()
    new_rows = []
    new_row_1 = Row("existing_col_1", "existing_col_2", "new_col_1", "new_col_2")
    i = 0
    for each_row in rows:
        i += 1
        # conditionally omit rows
        if i % 3 == 0:
            continue
        db_result = db.get_some_result(each_row.existing_col_2)
        new_col_1 = ''.join([db_result, "_NEW"])
        new_col_2 = db_result
        new_f_row = new_row_1(each_row.existing_col_1, each_row.existing_col_2, new_col_1, new_col_2)
        new_rows.append(new_f_row)

    db.close()
    return iter(new_rows)

第二部分,使用mapPartitions与使用withColumnfilter链的折衷是什么?
我曾经在某处读到,使用Spark DF中提供的方法总是比自己实现更好。如果我的论点有误,请告诉我。谢谢!欢迎所有想法。

你能分享一個你無法解決的問題的例子嗎?目前為止,你的問題有點過於廣泛且含混不清。 - mtoto
2
如果您担心的是withColumn的链式操作会串行执行,那么请放心,因为Lazy Spark会对这些操作进行优化,使得它们不会串行执行。 - pault
1
为了避免两次打开数据库连接,您可以返回一个列表,然后将输出拆分成列。类似于 df = df.withColumn('list_output', myUDF()).select("*", col('list_output')[0].alias('new_col1'), col('list_output)[1].alias('new_col2')).drop("list_output")。将其转换为RDD再转回DF会很慢,但我对此不是专家。 - pault
1
@void 看一下这篇帖子。你也可以使用你的udf返回一个StructType(),然后使用list_output.* - pault
显示剩余5条评论
2个回答

6
这种方法有没有未预见的问题?
有多个问题。最严重的影响是:
1.与普通DataFrame代码相比,内存占用要高几倍,并且需要进行大量垃圾回收。
2.在执行上下文之间移动数据需要高昂的序列化和反序列化成本。
3.在查询规划器中引入了破坏点。
4.在toDF调用上,模式推断的成本较高(如果提供了正确的模式,则可以避免),并且可能需要重新执行所有前面的步骤。
5.等等……
其中一些可以通过使用udf和select/withColumn来避免,而其他则不能。
假设我有一个由三个withColumns组成的链,然后再根据某些条件删除行的过滤器。这些是四个不同的操作(我不确定其中任何一个是否是宽转换)。但是,如果我使用mapPartitions,我可以一次性完成所有操作。
您的mapPartitions并不会消除任何操作,也不会提供任何优化,这些优化可以在Spark计划程序中排除。它的唯一优点是为昂贵的连接对象提供了一个良好的作用域。
我在某个地方读到,使用可用的Spark DF方法总是比自己实现更好。
当您开始使用执行器端Python逻辑时,您已经偏离了Spark SQL。无论您使用udf、RDD还是新添加的矢量化udf,都没有关系。归根结底,您应该根据代码的整体结构做出决策——如果它主要是在数据上直接执行Python逻辑,那么最好坚持使用RDD或完全跳过Spark。
如果它只是一小部分逻辑,并且不会导致严重的性能问题,请不要担心它。

-1

使用 df.withColumn() 是添加列的最佳方法,它们都是惰性添加的


我同意。难道不是所有的转换都是惰性求值的吗?即使是mapPartition也应该是。请查看问题的评论。我已经向pault提出了一个关注点。 - void
好的,我看了其他评论和你的问题。我对Python语法不是很熟悉,所以根据我的猜测,它应该与Scala相同。在Scala API中,您传递给add_new_cols的“行”实际上是一个“Iterator[Row]”。为了在查询中使用这些行的内容,您需要将这些迭代器具体化,这样做将耗尽这些迭代器。 - Chitral Verma
1
我建议您在这种情况下不要使用withColumns,因为这样会导致您为每一行创建/使用一个单独的连接。相反,使用mapPartitions,并在每个分区级别上与外部数据库建立连接,其中连接来自某个单例池。 - Chitral Verma
我对这里的问题陈述不是很熟悉,但如果我要解决这个问题,我可能会使用Spark API将外部表作为JDBC源读取,找到此DF和我的现有DF之间的公共键,然后将它们连接起来。在这个连接的DF上,我可以进行所有的处理。 - Chitral Verma

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接