Spark Dataframes有一个方法withColumn
,可以一次添加一个新列。要添加多个列,需要一系列的withColumn
。这是最佳实践吗?
我认为使用mapPartitions
有更多的优势。假设我有一个包含三个withColumn
的链以及一个过滤器来根据某些条件删除Row
。这是四个不同的操作(虽然我不确定是否有任何宽转换)。但如果我使用mapPartitions
,我可以一次性完成所有操作。如果我有一个数据库连接,我希望每个RDD分区只打开一次。
我的问题有两个部分。
第一部分,这是我的mapPartitions
实现。这种方法是否存在未预见的问题?是否有更加优雅的方法来实现?
df2 = df.rdd.mapPartitions(add_new_cols).toDF()
def add_new_cols(rows):
db = open_db_connection()
new_rows = []
new_row_1 = Row("existing_col_1", "existing_col_2", "new_col_1", "new_col_2")
i = 0
for each_row in rows:
i += 1
# conditionally omit rows
if i % 3 == 0:
continue
db_result = db.get_some_result(each_row.existing_col_2)
new_col_1 = ''.join([db_result, "_NEW"])
new_col_2 = db_result
new_f_row = new_row_1(each_row.existing_col_1, each_row.existing_col_2, new_col_1, new_col_2)
new_rows.append(new_f_row)
db.close()
return iter(new_rows)
第二部分,使用
mapPartitions
与使用withColumn
和filter
链的折衷是什么?我曾经在某处读到,使用Spark DF中提供的方法总是比自己实现更好。如果我的论点有误,请告诉我。谢谢!欢迎所有想法。
withColumn
的链式操作会串行执行,那么请放心,因为Lazy Spark会对这些操作进行优化,使得它们不会串行执行。 - paultdf = df.withColumn('list_output', myUDF()).select("*", col('list_output')[0].alias('new_col1'), col('list_output)[1].alias('new_col2')).drop("list_output")
。将其转换为RDD再转回DF会很慢,但我对此不是专家。 - paultStructType()
,然后使用list_output.*
。 - pault