我正在将一个CSV文件读入Spark DataFrame,并对其执行机器学习操作。然而,我一直遇到Python序列化EOFError的问题 - 有任何想法吗?我曾经认为这可能是内存问题 - 即文件超出了可用RAM - 但是大幅减小DataFrame的大小并没有防止EOF错误。
以下是玩具代码和错误信息。
#set spark context
conf = SparkConf().setMaster("local").setAppName("MyApp")
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)
#read in 500mb csv as DataFrame
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
inferschema='true').load('myfile.csv')
#get dataframe into machine learning format
r_formula = RFormula(formula = "outcome ~ .")
mldf = r_formula.fit(df).transform(df)
#fit random forest model
rf = RandomForestClassifier(numTrees = 3, maxDepth = 2)
model = rf.fit(mldf)
result = model.transform(mldf).head()
使用 spark-submit
在单节点上运行上述代码时,即使在将数据框大小缩小以适应模型拟合之前(例如 tinydf = df.sample(False, 0.00001)
),也会反复引发以下错误:
Traceback (most recent call last):
File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/daemon.py", line 157,
in manager
File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/daemon.py", line 61,
in worker
File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/worker.py", line 136,
in main if read_int(infile) == SpecialLengths.END_OF_STREAM:
File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/serializers.py", line 545,
in read_int
raise EOFError
EOFError
read.csv
而不是指定格式,我还会确保编码和分隔符(也称为分隔符)与您的 csv 文件一致。这些可以在.options()
下进行操作,语法与您指定的inferschema
和header
相同。更多选项可以在此处找到。 - Will