使用Python的Spark:如何解决阶段x包含一个非常大(xxx KB)的任务。最大推荐任务大小为100 KB。

46

我刚刚创建了一个Python列表,其中包含range(1,100000)

使用SparkContext执行了以下步骤:

a = sc.parallelize([i for i in range(1, 100000)])
b = sc.parallelize([i for i in range(1, 100000)])

c = a.zip(b)

>>> [(1, 1), (2, 2), -----]

sum  = sc.accumulator(0)

c.foreach(lambda (x, y): life.add((y-x)))

报出以下警告:

ARN TaskSetManager:第3阶段包含一个非常大(4644 KB)的任务。建议的最大任务大小为100 KB。

如何解决此警告?有没有办法处理大小?还会影响大数据的时间复杂度吗?


3
首先,“life”到底是什么?你是指“和”的累加器吗?无论如何,在这里都不应该成为问题。有关详细信息,请参见此处链接:http://mail-archives.us.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAJgQjQ_87xkpACXFpoOn0iuLZ2Q00qwWTLHmZfy9BAhKqDkv0A@mail.gmail.com%3E。 - zero323
3个回答

19

一般的想法是PySpark创建与执行器数量相同的Java进程,然后将数据传输到每个进程。如果进程太少,会在Java堆空间上发生内存瓶颈。

在您的情况下,具体错误是使用sc.parallelize([...])创建的RDD未指定分区数(参数numSlices,请参见文档)。而RDD默认分区数过小(可能由单个分区组成)。

为解决此问题,只需指定所需的分区数:

a = sc.parallelize([...], numSlices=1000)   # and likewise for b

随着您指定的切片数量越来越多,您将会看到警告消息中显示的大小减少。增加切片数量直到不再收到警告消息。例如,获得

Stage 0 contains a task of very large size (696 KB). The maximum recommended task size is 100 KB

意味着您需要指定更多的切片。


另一个有用的提示,用于解决内存问题(但这与警告消息无关):默认情况下,每个执行器可用的内存为1 GB左右。您可以通过命令行指定更大的量,例如使用 --executor-memory 64G


1
我该如何确定最佳的切片数量?从你的例子中看来,需要使用7个切片才能将每个切片大小控制在100 KB以下。但是,我该如何确定任务的大小呢? - user2361174
@user2361174 正如你所写的,通过警告信息(blabla 包含一个非常大的任务..),你会知道大小是否过高。在尝试之前,我不知道其他获取此信息的方法。 - Jealie
3
如果你import sys, math,那么n = math.ceil(sys.getsizeof(your_list) / 102400)将是将所有列表切片并使它们保持在100KB以下所需的最小切片数。 - Luis Da Silva

15

Spark本地在任务传输期间会默认复制每个变量的副本。对于这些变量的大型大小,您可能希望使用广播变量

如果您仍然遇到大小问题,则可能应将此数据作为RDD本身。


嗨,@Hitesh Dharamdasani,有没有办法更改SPARK配置文件以最大化任务大小? - sara
抱歉我迟到了。https://spark.apache.org/docs/1.2.0/tuning.html 在“数据序列化”部分中有一些建议,但通常不推荐使用。我尝试过kyro序列化程序并取得了一些成功,但也不值得吹嘘。广播变量是更好的选择。 - Hitesh Dharamdasani
如果变量不可序列化(因此必须使用像mapPartitions这样的运算符),该怎么办?我认为它不能包装在广播变量中。 - bachr
2
只是为了更清楚地解释这个答案。在提供的示例中超过100 KB的变量是[i for i in range(1, 100000)] - leo9r

6

5
在玩具示例中使用 sc.range 而不是 range 是可行的,但它无法解决更普遍的问题(即Python和Java之间如何传递数据)。 - Jealie

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接