如何解决:在Spark中处理非常大的任务

6

这里是我在Spark上运行的Python代码,用于对数据进行分析。我能够在小型数据集上运行以下程序。但是当处理大型数据集时,它会显示“阶段1包含一个非常大(17693 KB)的任务。最大推荐任务大小为100 KB”。

import os
import sys
import unicodedata
from operator import add

try:
    from pyspark import SparkConf
    from pyspark import SparkContext
except ImportError as e:
    print ("Error importing Spark Modules", e)
    sys.exit(1)

def tokenize(text):
    resultDict = {}
    text = unicodedata.normalize('NFKD', text).encode('ascii','ignore')

    str1= text[1]
    str2= text[0]

    arrText= text.split(str1)

    ss1 = arrText[0].split("/")

    docID = ss1[0].strip()

    docName = ss[1].strip()

    resultDict[docID+"_"+docName] = 1

    return resultDict.iteritems()

sc=SparkContext('local')
textfile = sc.textFile("path to my data")
fileContent = textfile.flatMap(tokenize)
rdd = sc.parallelize(fileContent.collect())
rdd= rdd.map(lambda x: (x[0], x[1])).reduceByKey(add)
print rdd.collect()
#reduceByKey(lambda a,b: a+b)
rdd.coalesce(1).saveAsTextFile("path to result")

这里我发出一个小警告:任务在此之后不会运行。有人能帮我解决这个问题吗?

16/06/10 19:19:58 WARN TaskSetManager: Stage 1 contains a task of very large size (17693 KB). The maximum recommended task size is 100 KB.
16/06/10 19:19:58 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 5314, localhost, partition 0,PROCESS_LOCAL, 18118332 bytes)
16/06/10 19:19:58 INFO Executor: Running task 0.0 in stage 1.0 (TID 5314)
16/06/10 19:43:00 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:43480 in memory (size: 3.9 KB, free: 511.1 MB)
16/06/10 19:43:00 INFO ContextCleaner: Cleaned accumulator 2

5
这个警告基本上表示你的转换函数太大了。由于你说任务在这之后没有继续,我会寻找一些可能导致内存占用增加的无限循环。 - z-star
你的意思是说我们需要在编译时使用标志 --executor-memory 来增加内存? - Baradwaj Aryasomayajula
1个回答

3
当Spark序列化任务时,它会递归地序列化完整的闭包上下文。在这种情况下,逻辑上的罪魁祸首似乎是您在tokenize中使用的unicodedata。我可能错了,但我没有看到代码中有其他重型数据结构。(注意,我通常使用Scala的Spark,我的Python很生疏)。我想知道该库是否由不可用于执行器节点的重型数据结构支持。
处理此类问题的典型模式如下:
  1. 确保所有库均可在执行器节点上使用。

  2. 使用广播变量将重型数据结构分发到执行器。

无关紧要的是,除非您将其用作调试工具,否则您进行了大量不必要的数据收集以将所有数据传回驱动程序的collect。可以链接变换:
sc.textFile(...).flatMap(...).map(...).reduceByKey(add).coalesce(1).saveAsTextFile(...)

我理解这个代码... 我正在运行它,没有使用'collect'。关于编码,我必须对大文本进行编码才能访问它。顺便说一下,所有的映射函数都已完成... 只是在reduceByKey处停止了... 所以我怀疑是否存在'unicodedata'的问题。 - Baradwaj Aryasomayajula
顺便提一下,我是在单节点上运行它的...这可能是问题的原因吗... - Baradwaj Aryasomayajula
你说得对,问题不应该出在 unicodedata 上。应该是 add,但这并没有太多意义。在本地模式下运行只意味着可能会有额外的 RAM 约束,但异常提示指出问题特别涉及基于闭包的序列化大小,而不是内存处理大小。 - Sim
我尝试按照你说的方式进行更改,但没有成功。而且“add”是一个内置的Spark函数。所以它可能出了什么问题呢... - Baradwaj Aryasomayajula
将您的代码放入Databricks社区版笔记本中,并在那里分享链接。没有完整的环境,无法调试闭包泄漏问题。 - Sim

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接