在 Spark 数据框中为日期列增加月份

11

我有一个场景,希望在Spark DataFrame的日期列中添加月份,该列具有两个数据类型为(Date,Int)的列。

例如:

df.show()
data_date months_to_add
2015-06-23 5
2016-07-20 7

我想添加一个新列,该列将有一个新日期(在现有日期的基础上添加几个月),输出将如下所示 -

data_date month_to_add new_data_date
2015-06-23 5           2015-11-23
2016-07-20 1           2016-8-20

我已经尝试了以下代码,但似乎没有起作用-

df = df.withColumn("new_data_date", a
  dd_months(col("data_date"), col("months_to_add")))

它给我报错-

'Column' object is not callable

请帮我找到不需要在数据框上使用SQL查询的方法来完成此操作。


1
@jeanr:add_months是一个预定义的Spark函数,用于将月份(整数值)添加到日期中。 - anurag
你是否将 add_months 注册为 UDF? - Chitral Verma
任何其他方法也可以,我找不到任何与将月份添加到日期相关的内容。 - anurag
@Chitral:add_months是Spark中预定义的函数。 - anurag
看看 user8371915 的回答。那应该可以解决问题。至于为什么你会出现错误,是因为 add_columns 需要一个日期列参数和一个要添加的月份的整数值。你在这里给了两个列,这就是为什么你会得到这个错误。 - Sivaprasanna Sethuraman
显示剩余2条评论
3个回答

15

我会使用expr

from pyspark.sql.functions import expr

df = spark.createDataFrame(
    [("2015-06-23", 5), ("2016-07-20", 7)],
    ("data_date", "months_to_add")
).select(to_date("data_date").alias("data_date"), "months_to_add")

df.withColumn("new_data_date", expr("add_months(data_date, months_to_add)")).show()

+----------+-------------+-------------+
| data_date|months_to_add|new_data_date|
+----------+-------------+-------------+
|2015-06-23|            5|   2015-11-23|
|2016-07-20|            7|   2017-02-20|
+----------+-------------+-------------+

看起来很整洁。但是,它使用的是哪个add_months函数?我认为它不是pyspark.sql.functions.add_months。 - Sivaprasanna Sethuraman
@DeSanta 无论如何,这都是一个Hive函数。 - philantrovert
@user8371915 非常感谢,这个方法非常有效! - anurag
@philantrovert 我在pyspark shell中尝试了一下。没有导入add_months,它对我来说是有效的。而且expr接受一个字符串。我想知道它是如何在没有导入的情况下工作的。 - Sivaprasanna Sethuraman
1
@DeSanta 这是一个Hive函数。你也可以使用sqlContext.sql来执行它。这就是为什么。 - philantrovert
收到了,谢谢 :) - Sivaprasanna Sethuraman

-2

由于函数 add_months 需要整数作为第二个参数,而您正在传递列值,因此出现错误。

请尝试使用以下语句并检查

df.withColumn("new_data_date",add_months(col("data_date"), df.first()[1])).show()

希望能有所帮助。

祝好,

尼拉吉


-2

尝试以下代码。它对我有效。

from pyspark.sql import Row
l =  [("2015-06-23", 5),("2016-07-20", 7)]
rdd1 = sc.parallelize(l)
row_rdd = rdd1.map(lambda x: Row(x[0], x[1]))
df = sqlContext.createDataFrame(row_rdd,['data_date', 'months_to_add'])
df.withColumn("new_data_date",add_months(col("data_date"), df.first()[1])).show()

敬礼, 尼拉吉


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接