PySpark序列化EOFError

36

我正在将一个CSV文件读入Spark DataFrame,并对其执行机器学习操作。然而,我一直遇到Python序列化EOFError的问题 - 有任何想法吗?我曾经认为这可能是内存问题 - 即文件超出了可用RAM - 但是大幅减小DataFrame的大小并没有防止EOF错误。

以下是玩具代码和错误信息。

#set spark context
conf = SparkConf().setMaster("local").setAppName("MyApp")
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)

#read in 500mb csv as DataFrame
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
     inferschema='true').load('myfile.csv')

#get dataframe into machine learning format
r_formula = RFormula(formula = "outcome ~ .")
mldf = r_formula.fit(df).transform(df)

#fit random forest model
rf = RandomForestClassifier(numTrees = 3, maxDepth = 2)
model = rf.fit(mldf)
result = model.transform(mldf).head()

使用 spark-submit 在单节点上运行上述代码时,即使在将数据框大小缩小以适应模型拟合之前(例如 tinydf = df.sample(False, 0.00001)),也会反复引发以下错误:

Traceback (most recent call last):
  File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/daemon.py", line 157, 
     in manager
  File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/daemon.py", line 61, 
     in worker
  File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/worker.py", line 136, 
     in main if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/serializers.py", line 545, 
     in read_int
    raise EOFError
  EOFError

1
你能否给刚发布的Spark 2.1.0一个机会? - Jacek Laskowski
2
你能把你正在尝试读取的 CSV 文件放在某个服务上吗?这样我们就可以查看一下了。 - Łukasz Gawron
1
我也看到了这个问题,不过是使用 JSON 而不是 CSV。 - rjurney
从Hadoop读取数据框。Numerous errors like Traceback (most recent call last): File "/home/sey1pal/distr/spark-2.1.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/daemon.py", line 157, in manager ... File "/home/sey1pal/distr/spark-2.1.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 199, in main if read_int(infile) == SpecialLengths.END_OF_STREAM: File "/home/sey1pal/distr/spark-2.1.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 577, in read_int raise EOFError - y.selivonchyk
您可以调用 read.csv 而不是指定格式,我还会确保编码和分隔符(也称为分隔符)与您的 csv 文件一致。这些可以在 .options() 下进行操作,语法与您指定的 inferschemaheader 相同。更多选项可以在此处找到。 - Will
显示剩余2条评论
3个回答

3

这个错误似乎是在pySpark的read_int函数中发生的。代码如下,来源于Spark网站

def read_int(stream):
length = stream.read(4)
if not length:
    raise EOFError
return struct.unpack("!i", length)[0]

这意味着从流中读取4个字节时,如果没有读取到任何字节,则会引发EOF错误。有关Python文档,请参见此处

0
你有检查过代码中 EOError 出现的位置吗?
我猜测它是在你试图定义 df 时出现的,因为这是你的代码中唯一尝试读取文件的地方。
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
     inferschema='true').load('myfile.csv')

在这行代码之后的每个点,您的代码都是使用变量df而不是文件本身进行操作,因此似乎这行代码会产生错误。
测试是否为此情况的一种简单方法是注释掉其余代码,或在上面的行之后放置像这样的一行代码。
print(len(df))

另一种方法是使用 try 循环,例如:
try:
    df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
     inferschema='true').load('myfile.csv')
except:
    print("Failed to load file into df!")

如果发现那一行代码是导致EOFError的原因,那么你根本没有得到数据框,所以试图减少它们不会有任何影响。

如果这是生成错误的代码行,则有两种可能:

  1. 您的代码在较早的时候调用了一个或两个.csv文件,并且在此行之前没有关闭它。如果是这样,请在此处上面简单地关闭它。

  2. .csv文件本身存在问题。尝试在此代码之外加载它们,并查看是否可以使用类似csv.reader的东西将它们正确地加载到内存中,并按预期方式操作它们。


0

我遇到了相同的问题,不知道如何调试。似乎会导致执行线程卡住,永远不会返回任何结果。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接