Python/PySpark最佳实践:如何复制DataFrame列?

5
这是针对使用Spark 2.3.2的Python/PySpark的技术问题。我正在寻找最佳实践方法,将一个数据帧的列复制到另一个数据帧中,对于一个非常大的数据集(按年/月/日分区,均匀地分配),此数据集包含10亿多行数据。每个数据行有120个需要转换或复制的列。输出数据框将被写入,日期分区,到另一个Parquet文件集中。
示例架构如下: 输入DFInput(colA,colB,colC)和 输出DFoutput(X,Y,Z) 我的目标是将DFinput复制到DFoutput,如下所示: (colA => Z,colB => X,colC => Y)。 在Python Spark 2.3+中,最佳实践是什么? 我应该为每个源列使用DF.withColumn()方法来复制到目标列吗? 考虑到每个拥有超过110列要复制的十亿行,这样做会表现良好吗?
谢谢。

withColumns 的性能存在问题。 - thebluephantom
5个回答

2

在 PySpark 中处理列映射的另一种方法是使用 字典。字典可以帮助您将初始数据帧的列映射到最终数据帧的列,使用 键/值 结构,如下所示:

from pyspark.sql.functions import col

df = spark.createDataFrame([
  [1, "John", "2019-12-01 10:00:00"],
  [2, "Michael", "2019-12-01 11:00:00"],
  [2, "Michael", "2019-12-01 11:01:00"],
  [3, "Tom", "2019-11-13 20:00:00"],
  [3, "Tom", "2019-11-14 00:00:00"],
  [4, "Sofy", "2019-10-01 01:00:00"]
], ["A", "B", "C"])


col_map = {"A":"Z", "B":"X", "C":"Y"}

df.select(*[col(k).alias(col_map[k]) for k in col_map]).show()

# +---+-------+-------------------+
# |  Z|      X|                  Y|
# +---+-------+-------------------+
# |  1|   John|2019-12-01 10:00:00|
# |  2|Michael|2019-12-01 11:00:00|
# |  2|Michael|2019-12-01 11:01:00|
# |  3|    Tom|2019-11-13 20:00:00|
# |  3|    Tom|2019-11-14 00:00:00|
# |  4|   Sofy|2019-10-01 01:00:00|
# +---+-------+-------------------+

在这里,我们将A、B、C分别映射为Z、X、Y。

如果您想要模块化的解决方案,您还可以将所有内容放入一个函数中:

def transform_cols(mappings, df):
  return df.select(*[col(k).alias(mappings[k]) for k in mappings])

通过使用猴子补丁将现有的DataFrame类的功能进行扩展,使其更加模块化。将下面的代码放在PySpark代码的顶部(您还可以创建一个迷你库,并在需要时将其包含在代码中):

from pyspark.sql import DataFrame

def transform_cols(self, mappings):
  return self.select(*[col(k).alias(mappings[k]) for k in mappings])

DataFrame.transform = transform_cols

然后使用以下方式调用:

df.transform(col_map).show()

PS:这可能是通过创建自己的库并通过DataFrame和"monkey patching"(对于熟悉C#的人来说,这是一种扩展方法)来扩展DataFrame功能的便捷方式。


这是一个好的解决方案,但我如何在原始数据框中进行更改呢?这里 df.select 返回了新的 df。我希望将列添加到我的原始 df 中。 - Vikas Garud

1
使用dataframe.withColumn()方法,通过添加新列或替换已存在的同名列来返回一个新的DataFrame。

1
使用Apache Spark的方法 - 就我理解你的问题而言 - 是将输入的DataFrame转换为所需的输出DataFrame。您可以简单地在输入DataFrame上使用selectExpr来完成此任务:
outputDF = inputDF.selectExpr("colB as X", "colC as Y", "colA as Z")

这种转换不会将输入的DataFrame数据“复制”到输出的DataFrame中。

1
我发现的这个有趣的例子展示了两种方法,其中更好的方法与其他答案相符。这是Scala,而不是pyspark,但同样的原则适用,尽管例子不同。
import org.apache.spark.sql.functions._
import spark.implicits._

val df = Seq(
             ("1","2", "3"),
             ("4", "5", "6"),
             ("100","101", "102")
            ).toDF("c1", "c2", "c3")

这很昂贵,那是withColumn,每次迭代都会创建一个新的DF:

val df2 = df.columns.foldLeft(df) { case (df, col) =>
          df.withColumn(col, df(col).cast("int"))
          }
//df2.show(false)

这是更快的。
val df3 = df.select(df.columns.map { col =>
          df(col).cast("int")
          }: _*)
//df3.show(false)

0

我在编程方面有点新手(使用Python),但是是否可以更容易地在SQL(或您拥有的任何其他源)中完成此操作,然后将其读入新/单独的数据框中?


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接