Spark窗口分区函数要花费很长时间才能完成

6

给定一个数据框,我试图计算过去30天内看到了多少次电子邮件 ID。我的函数的主要逻辑如下:

val new_df = df
  .withColumn("transaction_timestamp", unix_timestamp($"timestamp").cast(LongType))

val winSpec = Window
  .partitionBy("email")
  .orderBy(col("transaction_timestamp"))
  .rangeBetween(-NumberOfSecondsIn30Days, Window.currentRow)

val resultDF = new_df
  .filter(col("condition"))
  .withColumn("count", count(col("email")).over(winSpec))

配置文件:
spark.executor.cores=5

因此,我能看到有5个阶段使用了窗口函数,其中一些阶段很快就完成了(几秒钟内),而有两个阶段甚至在3小时内都没有完成,被卡在了最后几个任务上(进展非常缓慢):

对我来说,这是数据倾斜的问题。如果我从数据集中删除所有包含5个最高频率的email id的行,则作业很快就会完成(不到5分钟)。

如果我尝试在窗口partitionBy中使用其他键,则作业会在几分钟内完成:

 Window.partitionBy("email", "date")

但是,如果我这样做,显然会导致错误的计数计算,这不是一种可接受的解决方案。
我已经尝试过各种其他 Spark 设置,增加了更多内存、核心、并行度等等,但似乎都没有帮助。
Spark 版本:2.2
当前的 Spark 配置:
-executor-memory: 100G
-executor-cores: 5
-driver memory: 80G
-spark.executor.memory=100g
使用每台机器具备 16 核和 128 GB 内存,并且最多可以使用 500 个节点。
如何正确解决这个问题?
更新:为了提供更多上下文信息,这里是原始数据框架和相应的计算数据框架。
 val df = Seq(
      ("a@gmail.com", "2019-10-01 00:04:00"),
      ("a@gmail.com", "2019-11-02 01:04:00"), 
      ("a@gmail.com", "2019-11-22 02:04:00"),
      ("a@gmail.com", "2019-11-22 05:04:00"),
      ("a@gmail.com", "2019-12-02 03:04:00"),
      ("a@gmail.com", "2020-01-01 04:04:00"),
      ("a@gmail.com", "2020-03-11 05:04:00"),
      ("a@gmail.com", "2020-04-05 12:04:00"),
      ("b@gmail.com", "2020-05-03 03:04:00")  
    ).toDF("email", "transaction_timestamp")


val expectedDF = Seq(
      ("a@gmail.com", "2019-10-01 00:04:00", 1),
      ("a@gmail.com", "2019-11-02 01:04:00", 1), // prev one falls outside of last 30 days win
      ("a@gmail.com", "2019-11-22 02:04:00", 2),
      ("a@gmail.com", "2019-11-22 05:04:00", 3),
      ("a@gmail.com", "2019-12-02 03:04:00", 3),
      ("a@gmail.com", "2020-01-01 04:04:00", 1),
      ("a@gmail.com", "2020-03-11 05:04:00", 1),
      ("a@gmail.com", "2020-04-05 12:04:00", 2),
      ("b@gmail.com", "2020-05-03 03:04:00", 1) // new email
).toDF("email", "transaction_timestamp", count") 

你能告诉我你机器的规格吗?总核数和内存大小? - Sanket9394
3个回答

4
您说得对,这是一个数据倾斜问题,减小窗口大小会有很大帮助。如果只需要获取最近30天的信息,则不需要一直追溯到时间的开始。但是,如果构建一个具有时间索引的窗口,则在每个窗口的开头计算将会出错,因为它无法访问前一个窗口。
我建议的是构建一个索引,每30天递增,并且使用两个重叠的大小为60天的窗口,如下图所示:

overlapping windows

为了理解这个过程,让我们考虑一个数据点,如图所示,其index=2。如果您有一个大小为30天的窗口,它需要访问其窗口内和前一个窗口内的数据。这是不可能的。这就是为什么我们建立更大的窗口,以便我们可以访问所有数据。如果我们考虑win1,我们与30天大小的索引一样面临同样的问题。然而,如果我们考虑win2,所有数据都在索引1的窗口中可用。
对于索引为3的点,我们将使用win1。对于索引为4的点,我们将使用win2等。基本上,对于偶数索引,我们使用win2。对于奇数索引,我们使用win1。这种方法将大大减少最大分区大小,从而减少一次处理的最大数据量。
代码只是上述内容的翻译:
val winSize = NumberOfSecondsIn30Days

val win1 = Window
    .partitionBy("email", "index1")
    .orderBy(col("transaction_timestamp"))
    .rangeBetween(-winSize, Window.currentRow)
val win2 = Window
    .partitionBy("email", "index2")
    .orderBy(col("transaction_timestamp"))
    .rangeBetween(-winSize, Window.currentRow)

val indexed_df = new_df
    // the group by is only there in case there are duplicated timestamps,
    // so as to lighten the size of the windows
    .groupBy("email", "transaction_timestamp")
    .count()
    .withColumn("index",
        'transaction_timestamp / winSize cast "long")
    .withColumn("index1",
        ('transaction_timestamp / (winSize * 2)) cast "long")
    .withColumn("index2",
        (('transaction_timestamp + winSize) / (winSize * 2)) cast "long")

val result = indexed_df
    .withColumn("count", when(('index mod 2) === 0, sum('count) over win2)
                                      .otherwise(sum('count) over win1))

2
顺便提一下,你的示例中有一个错误。对于这个时间戳“2020-01-01 04:04:00”,你期望得到“2”。但是前一个时间戳是“2019-12-02 03:04:00”,比它早30天1小时。因此它超出了时间窗口,正确的结果应该是“1”。 - Oli
这个技巧旨在处理某些电子邮件比其他电子邮件包含更多数据的情况,但记录在时间上分布相当均匀。如果不是这种情况,我们可能需要额外的技巧,可能是连接操作。这种方法需要更加精细,那是肯定的。 - Oli
1
关于问题1,'index 表示名为 index 的列,例如 col("index")$"index"。关于问题2,要做与另一个问题相同的事情,如果我理解正确,将两个 count('*) 替换为 count(when(condition, '*)) - Oli
1
你能提供 indexed_df.join(df.groupBy("email").count.orderBy('count desc).limit(1).select("email"), Seq("email")).groupBy("index").count.orderBy('count desc).show 的结果吗?目标是识别具有最大数据量的电子邮件的月份分布。 - Oli
让我们在聊天中继续这个讨论 - ic10503
显示剩余10条评论

3

您的某些分区可能太大了,这是因为有些电子邮件中一个月里的数据太多了。

要解决这个问题,您可以创建一个只包含电子邮件和时间戳的新数据框。然后,您按电子邮件和时间戳进行分组,计算行数并在希望处理的数据量更少的情况下计算窗口。如果时间戳往往重复,即如果df.count远大于df.select("email", "timestamp").distinct.count,则计算速度将加快。如果不是这种情况,则可以截断时间戳以付出一定精度的代价。这样,您将不再计算最近30天(加上或减去一秒,因为时间戳是以秒为单位的)内发生的事件数量,而是根据您的需要计算最近一分钟、一小时甚至一天内发生的事件数量。您会失去一点精度,但可以大大加快计算速度。而且越精确,您得到的速度就越快。

代码如下:

// 3600 means hourly precision.
// Set to 60 for minute precision, 1 for second precision, 24*3600 for one day.
// Note that even precisionLoss = 1 might make you gain speed depending on
// the distribution of your data
val precisionLoss = 3600 
val win_size = NumberOfSecondsIn30Days / precisionLoss

val winSpec = Window
  .partitionBy("email")
  .orderBy("truncated_timestamp")
  .rangeBetween(-win_size, Window.currentRow)

val new_df = df.withColumn("truncated_timestamp",
                      unix_timestamp($"timestamp") / 3600 cast "long")

val counts = new_df
  .groupBy("email", "truncated_timestamp")
  .count
  .withColumn("count", sum('count) over winSpec)

val result = new_df
  .join(counts, Seq("email", "truncated_timestamp"))

非常感谢@Oli,这个解决方案真的很棒,但我仍然想知道为什么在失去一些精度时运行速度更快。是因为它能够进一步分区数据吗?我完全找不到关于窗口函数如何在内部工作的文档。 - ic10503
从物理查询计划来看,与没有精度损失时相比,它似乎需要查找的行数要小得多。这对我来说是有道理的。 - ic10503
1
Windows的工作方式在底层是相当简单的。当您通过电子邮件进行分区时,数据会被重新分区,因此会有一个昂贵的洗牌过程。然后,每个电子邮件都与一个任务相关联,由一台机器上的一个核心处理。这些任务执行窗口计算。因此,如果某个电子邮件的数据比其他电子邮件多得多,相应的任务将需要很长时间,整个作业也将如此,只等待一个执行者完成他的工作。 - Oli
1
这就是为什么我的第一个想法是减小窗口的大小,但结果令人失望。在这里,窗口计算之所以更快,有两个原因。数据框具有较少的列,因此记录较小(我们只保留了两列,也许您有很多列,这使得计算变得更慢)。但主要是通过首先聚合靠近的时间戳来减小窗口的大小。我们减小了窗口的大小,也减小了偏差。 - Oli
你好 :) 你已经设置了悬赏,顺便说一下,但是你能否接受其中一个答案,以便我们可以考虑关闭该线程? - Oli
1
如果您有类似这样的问题,请随时联系我。虽然第一次尝试未能提高性能,但我真的很喜欢解决这个问题的过程;-) - Oli

2

我们仍然可以避免使用Window

对于上述提到的df

val df2 = df.withColumn("timestamp", unix_timestamp($"transaction_timestamp").cast(LongType))

val df3 = df2.withColumnRenamed("timestamp","timestamp_2").drop("transaction_timestamp")

val finalCountDf = df2.join(df3,Seq("email"))
.withColumn("is_within_30", when( $"timestamp" - $"timestamp_2" < NumberOfSecondsIn30Days && $"timestamp" - $"timestamp_2" > 0 , 1).otherwise(0))
.groupBy("email","transaction_timestamp").agg(sum("is_within_30") as "count")
.withColumn("count",$"count"+1)

finalCountDf.orderBy("transaction_timestamp").show
/*
+-----------+---------------------+-----+
|      email|transaction_timestamp|count|
+-----------+---------------------+-----+
|a@gmail.com|  2019-10-01 00:04:00|    1|
|a@gmail.com|  2019-11-02 01:04:00|    1|
|a@gmail.com|  2019-11-22 02:04:00|    2|
|a@gmail.com|  2019-11-22 05:04:00|    3|
|a@gmail.com|  2019-12-02 03:04:00|    3|
|a@gmail.com|  2020-01-01 04:04:00|    1|
|a@gmail.com|  2020-03-11 05:04:00|    1|
|a@gmail.com|  2020-04-05 12:04:00|    2|
|b@gmail.com|  2020-05-03 03:04:00|    1|
+-----------+---------------------+-----+
*/

说明:

  • 根据“电子邮件”制作时间戳对(通过电子邮件连接)
  • 比较每个对并检查它是否在最近30天内:如果是,则将其标记为1,否则为0
  • 按“电子邮件”和“transaction_timestamp”总计数

假设:(电子邮件,transaction_timestamp)是不同的。如果不是,则可以通过添加monotonicallyIncreasingId来处理


谢谢@Sanket9394。问题是我需要计算每行的计数。我不确定是否可以在不使用像你所做的窗口函数的情况下完成这个操作。请查看我添加到问题陈述中的数据框。 - ic10503
此外,您正在尝试查找最近30天内的哪些行是从当前时间开始的,但实际上应该是从交易时间戳开始。 - ic10503
我已经编辑了我的回答。你现在可以检查一下。这将确保你的5个核心获得相等的负载。 - Sanket9394
另外,如果您有更多的核心,请使用它们。增加核心数量将有助于减少此方法的运行时间,但对于偏斜数据的窗口来说可能会成为瓶颈。 - Sanket9394
@Sanket9394 谢谢!我想知道为什么你说窗口函数不会随着更多资源而扩展,但连接会。这是 Spark 有某些架构限制吗?我能想到的唯一一件事就是 Spark 对内部数据结构(如 shuffle 块等)施加的 2GB 限制。 - ic10503
显示剩余4条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接