PySpark如何从一个TimeStampType列向DataFrame中添加一列?

18

我有一个类似这样的DataFrame。 我想在date_time字段的日期上进行操作。

root
 |-- host: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- date_time: timestamp (nullable = true)

我尝试添加一列来提取日期,但迄今为止我的尝试都失败了。

df = df.withColumn("day", df.date_time.getField("day"))

org.apache.spark.sql.AnalysisException: GetField is not valid on fields of type TimestampType;

这也失败了

df = df.withColumn("day", df.select("date_time").map(lambda row: row.date_time.day))

AttributeError: 'PipelinedRDD' object has no attribute 'alias'

你有任何想法吗,可以完成这个任务?

2个回答

33
你可以使用简单的map:
df.rdd.map(lambda row:
    Row(row.__fields__ + ["day"])(row + (row.date_time.day, ))
)

另一种选择是注册一个函数并运行 SQL 查询:

sqlContext.registerFunction("day", lambda x: x.day)
sqlContext.registerDataFrameAsTable(df, "df")
sqlContext.sql("SELECT *, day(date_time) as day FROM df")

最后,您可以像这样定义自定义函数:

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

day = udf(lambda date_time: date_time.day, IntegerType())
df.withColumn("day", day(df.date_time))

编辑:

实际上,如果您使用原始的SQL语句,在Spark 1.4中day函数已经定义好了,因此您可以省略UDF注册。它还提供了许多不同的日期处理函数,包括:

还可以使用简单的日期表达式,例如:

current_timestamp() - expr("INTERVAL 1 HOUR")

这意味着您可以构建相对复杂的查询,而无需将数据传递给Python。例如:

df =  sc.parallelize([
    (1, "2016-01-06 00:04:21"),
    (2, "2016-05-01 12:20:00"),
    (3, "2016-08-06 00:04:21")
]).toDF(["id", "ts_"])

now = lit("2016-06-01 00:00:00").cast("timestamp") 
five_months_ago = now - expr("INTERVAL 5 MONTHS")

(df
    # Cast string to timestamp
    # For Spark 1.5 use cast("double").cast("timestamp")
    .withColumn("ts", unix_timestamp("ts_").cast("timestamp"))
    # Find all events in the last five months
    .where(col("ts").between(five_months_ago, now))
    # Find first Sunday after the event
    .withColumn("next_sunday", next_day(col("ts"), "Sun"))
    # Compute difference in days
    .withColumn("diff", datediff(col("ts"), col("next_sunday"))))

有很多列,我只想再添加一列。使用map方法可能太繁琐,需要列出所有现有的列。我将尝试使用register函数的方式。谢谢。 - Wai Yip Tung
您IP地址为143.198.54.68,由于运营成本限制,当前对于免费用户的使用频率限制为每个IP每72小时10次对话,如需解除限制,请点击左下角设置图标按钮(手机用户先点击左上角菜单按钮)。 - zero323
1
定义UDF似乎是我迄今为止找到的最干净的方法。已添加到答案中。 - zero323

0

enter image description here

res=df.withColumn("dayofts",dayofmonth("ts_"))
from pyspark.sql import functions as F
res=df.withColumn("dayofts",F.dayofmonth("ts_"))
res.show()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接