连续行之间的日期差异 - Pyspark数据框架

13

我有一个以下结构的表格

USER_ID     Tweet_ID                 Date
  1           1001       Thu Aug 05 19:11:39 +0000 2010
  1           6022       Mon Aug 09 17:51:19 +0000 2010
  1           1041       Sun Aug 19 11:10:09 +0000 2010
  2           9483       Mon Jan 11 10:51:23 +0000 2012
  2           4532       Fri May 21 11:11:11 +0000 2012
  3           4374       Sat Jul 10 03:21:23 +0000 2013
  3           4334       Sun Jul 11 04:53:13 +0000 2013

基本上我想做的是使用PysparkSQL查询,计算用户ID号相同的连续记录之间的日期差(以秒为单位)。预期结果是:

1      Sun Aug 19 11:10:09 +0000 2010 - Mon Aug 09 17:51:19 +0000 2010     839930
1      Mon Aug 09 17:51:19 +0000 2010 - Thu Aug 05 19:11:39 +0000 2010     340780
2      Fri May 21 11:11:11 +0000 2012 - Mon Jan 11 10:51:23 +0000 2012     1813212
3      Sun Jul 11 04:53:13 +0000 2013 - Sat Jul 10 03:21:23 +0000 2013     5510
3个回答

15

另一种方式可能是:

from pyspark.sql.functions import lag
from pyspark.sql.window import Window

df.withColumn("time_intertweet",(df.date.cast("bigint") - lag(df.date.cast("bigint"), 1)
.over(Window.partitionBy("user_‌​id")
.orderBy("date")‌​))
.cast("bigint"))

7

编辑 感谢 @cool_kid

@Joesemy的回答非常好,但对我无效,因为cast("bigint")引发了错误。所以我使用了 pyspark.sql.functions模块 中的 datediff 函数,并且像这样运行它就可以了:

from pyspark.sql.functions import *
from pyspark.sql.window import Window

df.withColumn("time_intertweet", datediff(df.date, lag(df.date, 1)
    .over(Window.partitionBy("user_‌​id")
    .orderBy("date")‌​)))

1
这个问题解决了,只需要将“-”改成“,”。同时,需要导入以下两个库:from pyspark.sql import functions as f 和 from pyspark.sql.window import Window。然后更新f.datediff和f.lag即可。非常感谢LePuppy! - cool_kid
2
这个代码不是按秒计算日期差异,而是按天数计算:https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.functions.datediff - AndrewMehrmann

4

就像这样:

df.registerTempTable("df")

sqlContext.sql("""
     SELECT *, CAST(date AS bigint) - CAST(lag(date, 1) OVER (
              PARTITION BY user_id ORDER BY date) AS bigint) 
     FROM df""")

4
另一种方式:df.withColumn("time_intertweet",(df.date.cast("bigint") - lag(df.date.cast("bigint"),1).over(Window.partitionBy("user_id").orderBy("date"))).cast("bigint")) 翻译:另一种方法是,使用df.withColumn函数并添加一个新列"time_intertweet",其中包含计算结果:对于每个用户(user_id),按照日期(date)排序,将当前日期减去前一行的日期,然后将结果转换为整数类型。 - Josemy
@Joss,你能不能把这个加到回答里面呢?我已经将它转换成了维基百科。谢谢。 - user6022341

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接