将 Python 列表添加为 PySpark DataFrame 的新列

7

我有一个列表:

dates = [2017, 2018, 2018, 2018, 2019, 2019, 2019, 2020, 2020, 2020]

我尝试将数据框添加到长度相同的另一个数据框中(这方面没有问题)。

我尝试了以下方法:

df = df.withColumn("YEARS", dates)
Error: Column needs to be col

我也尝试了:

df = df.withColumn("YEARS", f.lit(dates))

但是那并不起作用。

我看到了这个问题:如何在Spark DataFrame中添加常量列?

但是那里的所有内容对于这种情况都没有用。

更新: 期望的结果是:

df_columns...   | dates_from_list
---------------------------------
original_df_data| 2017
original_df_data| 2018
original_df_data| 2018
original_df_data| 2018
original_df_data| 2019
original_df_data| 2019
original_df_data| 2019
original_df_data| 2020
original_df_data| 2020
original_df_data| 2020
2个回答

12
你的错误源于需要向withColumn传递一个Column对象。
以下是两种将日期作为Spark DataFrame的新列添加的方法(使用每个记录的顺序进行连接),具体取决于日期数据的大小。

1)如果操作的是小型数据集

一种简洁的方法是对单调递增的id应用UDF:
from pyspark.sql.functions import udf, monotonically_increasing_id

df = [...]  # 10 records

dates = [2017, 2018, 2018, 2018, 2019, 2019, 2019, 2020, 2020, 2020]

df = df.repartition(1).withColumn(
    "YEARS", 
    udf(lambda id: dates[id])(monotonically_increasing_id()))

df.show()

输出:

+---+-----+
|...|YEARS|
+---+-----+
|...| 2017|
|...| 2018|
|...| 2018|
|...| 2018|
|...| 2019|
|...| 2019|
|...| 2019|
|...| 2020|
|...| 2020|
|...| 2020|
+---+-----+

注意: 使用.repartition(1)确保生成的id连续。如果您有另一种方法将每个记录映射到dates中的值(例如已构建的id列),则可以避免将其重新分区为单个分区。

/!\ 如果数据框和Python列表太大,则无法扩展:

  • 需要重分区数据框,导致昂贵的Shuffle/Exchange操作
  • .repartition(1)可能会导致生成非常大的分区,处理速度非常慢(因为它是巨大的,如果不适合执行内存,它可能意味着许多额外的磁盘I/O将RDD块溢出到磁盘),或者使作业崩溃,并引发OutOfMemoryError
  • Python列表由UDF (通过lambda闭包)捕获,这意味着它将被广播到集群的每个执行器

2) 如果您要处理大小>数百万行的数据集

以下是另一种方法,可以使用Pandas操纵ids和dates列,避免对Spark DataFrame进行任何重新分区,从而更好地处理数百万行的数据。

可以像这样完成:

import pandas as pd
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.session import SparkSession

spark = SparkSession.builder.getOrCreate()

# some spark DataFrame of length N
df = [...]  

# generate monotically increasing ids (not consecutive) without repartitioning the Spark DataFrame.
df = df.withColumn("id", monotonically_increasing_id())

# get generated ids (not consecutive) as a mono-column pandas DataFrame
spark_df_ids = df.select("id").toPandas()

# some python list of length N
dates = [2017, 2018, 2018, 2018, 2019, ..., 2019, 2019, 2020, 2020, 2020]

# build pandas DataFrame from dates
dates_pandas_df = pd.DataFrame(dates, columns=["YEARS"])

# append the id column to the dates in pandas
dates_and_ids_pandas_df = dates_pandas_df.join(spark_df_ids)

# convert from pandas DataFrame to spark DataFrame
dates_and_ids_spark_df = spark.createDataFrame(dates_and_ids_pandas_df)

# Perform the final adding of the dates column to the Spark DataFrame with a join in Spark
df.join(dates_and_ids_spark_df, ["id"]).show()

重要提示:使用 Apache Arrow 可以更快地进行与 pandas 之间的转换。了解有关在 Spark 中使用 Apache Arrow 的详情


我在示例中使用了10行,仅供参考。如果数据集为1-2百万行,使用这个UDF会不会影响效率?(我通常使用Pandas编码,但由于我正在处理的数据集的大小,我正在转换为PySpark) - Toby Djelyinski
即使您使用另一种方式将数据帧记录映射到python列表的元素,从而避免了.repartition(1),但是对于数百万行,另一个潜在的巨大成本显然并不便宜:udf(通过lambda闭包)捕获了python列表,这意味着它将被广播。因此,在这个规模上,最好直接使用pandas,然后将您的pandas dataframe转换为spark dataframe:spark.createDataFrame(pandas_df)。启用Apache Arrow可以加快速度。如果有帮助,可以接受此答案,也许开另一个问题。 - bonnal-enzo
1
谢谢,您可以看一下我的修改,这样会有一个更具可扩展性的替代方案。 - bonnal-enzo
1
@EnzoBnl..我喜欢上面的方法。这里有另一种类似的方法,但是列表元素和数据帧行数必须相同。https://stackoverflow.com/questions/58188495/adding-a-list-element-as-a-column-to-existing-pyspark-dataframe/58225812#58225812 - vikrant rana

1
你可以尝试这个:
dates = [2017, 2018, 2018, 2018, 2019, 2019, 2019, 2020, 2020, 2020]
df = spark.createDataFrame([Row(a=1)])
df = df.withColumn("YEARS",  array( [lit(x) for x in dates]  ))


df.show(truncate=False)
+---+------------------------------------------------------------+
|a  |YEARS                                                       |
+---+------------------------------------------------------------+
|1  |[2017, 2018, 2018, 2018, 2019, 2019, 2019, 2020, 2020, 2020]|
+---+------------------------------------------------------------+

df.select("a", explode("YEARS")).show()
+---+----+
|  a| col|
+---+----+
|  1|2017|
|  1|2018|
|  1|2018|
|  1|2018|
|  1|2019|
|  1|2019|
|  1|2019|
|  1|2020|
|  1|2020|
|  1|2020|
+---+----+

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接