线程 # 正在将 _ GB 的排序数据溢出到磁盘上

5

我正在尝试编写一个ETL过程,在联合之前合并两个数据集。我为每个数据集添加了一列,新鲜的数据集得到2,旧的数据集得到1,然后如果行具有重复的主键,则删除在旧/新列中有1的行。我已经尝试以多种方式编写此代码,最近采用以下方法:

orderBy(keys, desc(old/new)).dropDuplicates(keys)

但是在大型数据集上,我总是会遇到巨大的减速现象,并出现以下消息:

16/09/21 20:31:45 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (0  time so far)
16/09/21 20:32:00 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (1  time so far)
16/09/21 20:32:16 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (2  times so far)
16/09/21 20:32:31 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (3  times so far)
16/09/21 20:32:47 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (4  times so far)
16/09/21 20:33:02 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (5  times so far)
16/09/21 20:33:18 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (6  times so far)
16/09/21 20:33:33 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (7  times so far)
16/09/21 20:33:49 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (8  times so far)
16/09/21 20:34:04 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (9  times so far)
16/09/21 20:34:19 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (10  times so far)
16/09/21 20:34:35 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (11  times so far)
16/09/21 20:34:50 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (12  times so far)
16/09/21 20:35:06 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (13  times so far)
16/09/21 20:35:21 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (14  times so far)
16/09/21 20:35:37 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (15  times so far)
16/09/21 20:35:52 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (16  times so far)
16/09/21 20:36:07 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (17  times so far)
16/09/21 20:36:23 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (18  times so far)
16/09/21 20:36:38 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (19  times so far)
16/09/21 20:36:53 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (20  times so far)
16/09/21 20:37:09 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (21  times so far)
16/09/21 20:37:24 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (22  times so far)
16/09/21 20:37:40 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (23  times so far)
16/09/21 20:37:55 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (24  times so far)
16/09/21 20:38:10 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (25  times so far)
16/09/21 20:38:25 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (26  times so far)
16/09/21 20:38:41 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (27  times so far)
16/09/21 20:38:56 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (28  times so far)
16/09/21 20:39:25 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (0  time so far)
16/09/21 20:39:45 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (1  time so far)
16/09/21 20:40:05 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (2  times so far)
16/09/21 20:40:26 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (3  times so far)
16/09/21 20:40:46 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (4  times so far)
16/09/21 20:41:07 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (5  times so far)
16/09/21 20:41:27 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (6  times so far)
16/09/21 20:41:47 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (7  times so far)
16/09/21 20:42:07 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (8  times so far)
16/09/21 20:42:28 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (9  times so far)
16/09/21 20:42:49 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (10  times so far)
16/09/21 20:43:09 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (11  times so far)
16/09/21 20:43:30 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (12  times so far)
16/09/21 20:43:50 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (13  times so far)
16/09/21 20:44:11 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (14  times so far)
16/09/21 20:44:31 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (15  times so far)
16/09/21 20:44:52 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (16  times so far)
16/09/21 20:45:13 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (17  times so far)
16/09/21 20:45:33 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (18  times so far)
16/09/21 20:45:53 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (19  times so far)
16/09/21 20:46:14 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (20  times so far)
16/09/21 20:46:34 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (21  times so far)
16/09/21 20:46:54 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (22  times so far)
16/09/21 20:47:14 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (23  times so far)
16/09/21 20:47:34 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (24  times so far)
16/09/21 20:47:54 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (25  times so far)
16/09/21 20:48:14 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (26  times so far)
16/09/21 20:48:34 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (27  times so far)
16/09/21 20:48:54 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (28  times so far)

经检查Spark UI,只有一个线程在加班工作,而其余线程已经完成。是否可能将其分散在各个线程之间?

enter image description here
1个回答

1
你的处理方式会放大与数据倾斜相关的问题。因为你首先按键和指示变量重新排序数据,所以会先洗牌数据,可能会创建高度不平衡的分区。此后应用的任何缩减都无法弥补这一点。
至少有两种方法可以实现相同的结果,同时充分利用映射端的缩减。我在 SPARK DataFrame: select the first row of each group 的答案中解释了这两种方法,简单重申如下:
- 您可以使用结构体排序来选择每个组的最小/最大行。 - 您可以使用静态类型的 Dataset,其中包括 groupByKey,然后是 reduceGroups。

如果我在主键上进行orderBy,那么我怎么可能会创建一个偏斜呢?我尝试了你的Window.partitionBy方法并遇到了问题,但我会探索你提供的其他替代方案。 - Brady Auen
窗口函数将做几乎相同的事情(首先重新分区),因此在这种情况下不是您想要做的事情。orderBy或多或少等同于partitionBy,因此如果某些键特别常见,则会导致非均匀分布。在这种情况下,您需要的是有效的映射端组合。它可能无法解决问题,但至少可以提高整体性能。 - zero323
没错,但在这种情况下,我的键是主键/复合键,所以它们应该都是唯一的,但我仍然遇到了问题。 - Brady Auen
1
@zero323 我正在尝试从Oracle读取数据并存储到Parquet文件中,同时在存储Parquet时我使用了以下代码:df .write.format("parquet") .mode("overwrite") .partitionBy("year","month") .save(parquet_file)但是这里有很多洗牌操作,数据也会溢出到100GB的磁盘上,而且在30个执行器中只有一个执行器需要很长时间...如何并行化处理呢? - BdEngineer
@BdEngineer 我遇到了同样的情况,在执行saveAsTable时,大量数据会溢出到磁盘上,而且只有一个执行器在工作。你能解决这个问题吗?我知道这是4年前的事情了,希望你还记得。 - Himanshu

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接