你的错误源于需要向
withColumn
传递一个
Column
对象。
以下是两种将日期作为Spark
DataFrame
的新列添加的方法(使用每个记录的顺序进行连接),具体取决于日期数据的大小。
1)如果操作的是小型数据集
一种简洁的方法是对单调递增的id应用UDF:
from pyspark.sql.functions import udf, monotonically_increasing_id
df = [...]
dates = [2017, 2018, 2018, 2018, 2019, 2019, 2019, 2020, 2020, 2020]
df = df.repartition(1).withColumn(
"YEARS",
udf(lambda id: dates[id])(monotonically_increasing_id()))
df.show()
输出:
+---+-----+
|...|YEARS|
+---+-----+
|...| 2017|
|...| 2018|
|...| 2018|
|...| 2018|
|...| 2019|
|...| 2019|
|...| 2019|
|...| 2020|
|...| 2020|
|...| 2020|
+---+-----+
注意: 使用.repartition(1)
确保生成的id连续。如果您有另一种方法将每个记录映射到dates
中的值(例如已构建的id列),则可以避免将其重新分区为单个分区。
/!\ 如果数据框和Python列表太大,则无法扩展:
- 需要重分区数据框,导致昂贵的Shuffle/Exchange操作
.repartition(1)
可能会导致生成非常大的分区,处理速度非常慢(因为它是巨大的,如果不适合执行内存,它可能意味着许多额外的磁盘I/O将RDD块溢出到磁盘),或者使作业崩溃,并引发OutOfMemoryError
- Python列表由UDF (通过lambda闭包)捕获,这意味着它将被广播到集群的每个执行器
2) 如果您要处理大小>数百万行的数据集
以下是另一种方法,可以使用Pandas操纵ids和dates列,避免对Spark DataFrame
进行任何重新分区,从而更好地处理数百万行的数据。
可以像这样完成:
import pandas as pd
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()
df = [...]
df = df.withColumn("id", monotonically_increasing_id())
spark_df_ids = df.select("id").toPandas()
dates = [2017, 2018, 2018, 2018, 2019, ..., 2019, 2019, 2020, 2020, 2020]
dates_pandas_df = pd.DataFrame(dates, columns=["YEARS"])
dates_and_ids_pandas_df = dates_pandas_df.join(spark_df_ids)
dates_and_ids_spark_df = spark.createDataFrame(dates_and_ids_pandas_df)
df.join(dates_and_ids_spark_df, ["id"]).show()
重要提示:使用 Apache Arrow 可以更快地进行与 pandas 之间的转换。了解有关在 Spark 中使用 Apache Arrow 的详情
.repartition(1)
,但是对于数百万行,另一个潜在的巨大成本显然并不便宜:udf(通过lambda闭包)捕获了python列表,这意味着它将被广播。因此,在这个规模上,最好直接使用pandas,然后将您的pandas dataframe转换为spark dataframe:spark.createDataFrame(pandas_df)
。启用Apache Arrow可以加快速度。如果有帮助,可以接受此答案,也许开另一个问题。 - bonnal-enzo