给定一个数据框,我试图计算过去30天内看到了多少次电子邮件 ID。我的函数的主要逻辑如下:
val new_df = df
.withColumn("transaction_timestamp", unix_timestamp($"timestamp").cast(LongType))
val winSpec = Window
.partitionBy("email")
.orderBy(col("transaction_timestamp"))
.rangeBetween(-NumberOfSecondsIn30Days, Window.currentRow)
val resultDF = new_df
.filter(col("condition"))
.withColumn("count", count(col("email")).over(winSpec))
配置文件:
spark.executor.cores=5
因此,我能看到有5个阶段使用了窗口函数,其中一些阶段很快就完成了(几秒钟内),而有两个阶段甚至在3小时内都没有完成,被卡在了最后几个任务上(进展非常缓慢):
对我来说,这是数据倾斜的问题。如果我从数据集中删除所有包含5个最高频率的email
id的行,则作业很快就会完成(不到5分钟)。
如果我尝试在窗口partitionBy中使用其他键,则作业会在几分钟内完成:
Window.partitionBy("email", "date")
但是,如果我这样做,显然会导致错误的计数计算,这不是一种可接受的解决方案。
我已经尝试过各种其他 Spark 设置,增加了更多内存、核心、并行度等等,但似乎都没有帮助。
Spark 版本:2.2
当前的 Spark 配置:
-executor-memory: 100G
-executor-cores: 5
-driver memory: 80G
-spark.executor.memory=100g
使用每台机器具备 16 核和 128 GB 内存,并且最多可以使用 500 个节点。
如何正确解决这个问题?
更新:为了提供更多上下文信息,这里是原始数据框架和相应的计算数据框架。
val df = Seq(
("a@gmail.com", "2019-10-01 00:04:00"),
("a@gmail.com", "2019-11-02 01:04:00"),
("a@gmail.com", "2019-11-22 02:04:00"),
("a@gmail.com", "2019-11-22 05:04:00"),
("a@gmail.com", "2019-12-02 03:04:00"),
("a@gmail.com", "2020-01-01 04:04:00"),
("a@gmail.com", "2020-03-11 05:04:00"),
("a@gmail.com", "2020-04-05 12:04:00"),
("b@gmail.com", "2020-05-03 03:04:00")
).toDF("email", "transaction_timestamp")
val expectedDF = Seq(
("a@gmail.com", "2019-10-01 00:04:00", 1),
("a@gmail.com", "2019-11-02 01:04:00", 1), // prev one falls outside of last 30 days win
("a@gmail.com", "2019-11-22 02:04:00", 2),
("a@gmail.com", "2019-11-22 05:04:00", 3),
("a@gmail.com", "2019-12-02 03:04:00", 3),
("a@gmail.com", "2020-01-01 04:04:00", 1),
("a@gmail.com", "2020-03-11 05:04:00", 1),
("a@gmail.com", "2020-04-05 12:04:00", 2),
("b@gmail.com", "2020-05-03 03:04:00", 1) // new email
).toDF("email", "transaction_timestamp", count")