Spark窗口性能问题

5

我有一个帕奎特数据框,其结构如下:

  1. ID 字符串
  2. DATE 日期
  3. 480个其他的双精度浮点类型特征列

我需要用它们对应的加权移动平均值替换每个480个特征列,窗口大小为250。 最初,我正在尝试使用以下简单代码来处理单个列:

var data = sparkSession.read.parquet("s3://data-location")
var window = Window.rowsBetween(-250, Window.currentRow - 1).partitionBy("ID").orderBy("DATE")
data.withColumn("Feature_1", col("Feature_1").divide(avg("Feature_1").over(window))).write.parquet("s3://data-out")

输入数据包含2000万行,每个ID大约有4-5000个相关日期。我已经在AWS EMR集群(m4.xlarge实例)上运行了此操作,并获得了以下结果:
  • 4个执行器X 4个核心X 10 GB + 1 GB用于yarn开销(因此每个任务为2.5GB,16个并发运行任务),花费14分钟
  • 8个执行器X 4个核心X 10GB + 1 GB用于yarn开销(因此每个任务为2.5GB,32个并发运行任务),花费8分钟
我调整了以下设置,希望能缩短总时间:
  • spark.memory.storageFraction 0.02
  • spark.sql.windowExec.buffer.in.memory.threshold 100000
  • spark.sql.constraintPropagation.enabled false
第二个设置有助于避免日志中出现的一些spilling,但没有任何实际性能提升。
我不明白仅20百万条记录需要这么长时间。我知道计算加权移动平均值需要进行20M X 250(窗口大小)个平均值和除法,但是使用16个核心(第一次运行),我不明白为什么需要这么长时间。我无法想象处理其余479个特征列需要多长时间!
我还尝试增加默认shuffle分区,设置:
  • spark.sql.shuffle.partitions 1000
但即使有1000个分区,也无法缩短时间。尝试在调用窗口聚合之前按ID和日期对数据进行排序,但没有任何好处。
是否有任何方法可以改善此问题,或者窗口函数通常在我的用例中运行缓慢?这仅仅是2000万行,远远不及Spark可以处理的其他类型的工作负载。
1个回答

0

您的数据集大小约为70GB。 如果我理解正确,对于每个ID,它会按日期对所有记录进行排序,然后取前250条记录进行平均值计算。由于需要在400多列上应用此操作,我建议在创建parquet文件时尝试使用桶分配(bucketing)以避免洗牌(shuffling)。尽管编写桶分配(parquet)文件需要相当长的时间,但对于480个列的推导,可能不需要8分钟*480执行时间。

请尝试在创建parquet文件时使用桶分配(bucketing)或重新划分(repartition)和排序(sortwithin),让我知道是否有效。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接