在Spark中读取大文件的问题 - Python

7

我在本地安装了Python版的Spark,当运行以下代码时:

data=sc.textFile('C:\\Users\\xxxx\\Desktop\\train.csv')
data.first()

I get the following error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-11-fca93c6aedeb> in <module>()
----> 1 data.first()

C:\Spark\python\pyspark\rdd.pyc in first(self)
   1313         ValueError: RDD is empty
   1314         """
-> 1315         rs = self.take(1)
   1316         if rs:
   1317             return rs[0]

C:\Spark\python\pyspark\rdd.pyc in take(self, num)
   1295 
   1296             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1297             res = self.context.runJob(self, takeUpToNumLeft, p)
   1298 
   1299             items += res

C:\Spark\python\pyspark\context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal)
    937         # SparkContext#runJob.
    938         mappedRDD = rdd.mapPartitions(partitionFunc)
--> 939         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
    940         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
    941 

C:\Anaconda2\lib\site-packages\py4j\java_gateway.pyc in __call__(self, *args)
   1024         answer = self.gateway_client.send_command(command)
   1025         return_value = get_return_value(
-> 1026             answer, self.gateway_client, self.target_id, self.name)
   1027 
   1028         for temp_arg in temp_args:

C:\Spark\python\pyspark\sql\utils.pyc in deco(*a, **kw)
     43     def deco(*a, **kw):
     44         try:
---> 45             return f(*a, **kw)
     46         except py4j.protocol.Py4JJavaError as e:
     47             s = e.java_exception.toString()

C:\Anaconda2\lib\site-packages\py4j\protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
    314                 raise Py4JJavaError(
    315                     "An error occurred while calling {0}{1}{2}.\n".
--> 316                     format(target_id, ".", name), value)
    317             else:
    318                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.net.SocketException: Connection reset by peer: socket write error
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(Unknown Source)
    at java.net.SocketOutputStream.write(Unknown Source)
    at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
    at java.io.BufferedOutputStream.write(Unknown Source)
    at java.io.DataOutputStream.write(Unknown Source)
    at java.io.FilterOutputStream.write(Unknown Source)
    at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
    at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
    at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393)
    at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketException: Connection reset by peer: socket write error
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(Unknown Source)
    at java.net.SocketOutputStream.write(Unknown Source)
    at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
    at java.io.BufferedOutputStream.write(Unknown Source)
    at java.io.DataOutputStream.write(Unknown Source)
    at java.io.FilterOutputStream.write(Unknown Source)
    at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
    at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)

我确定路径是正确的,因为我已经在同一个文件夹中尝试了其他文件。我认为问题在于该文件的大小,大约为3.4GB。
请帮忙解决,谢谢!

1
值得尝试使用Scala进行测试。 - Rockie Yang
1
正如@RockieYang所说,您可能希望启动一个spark-shell并在其中尝试您的代码,以查看它为什么失败。 - Jonathan Taws
2
你确定路径是正确的吗?我用Spark读取过更大的文件,没有任何问题。 - Himaprasoon
1
我觉得没有必要使用多个SparkContext。只需使用由spark-shell创建的SparkContext,或者使用sqlContext即可。 - Rockie Yang
1
通过查看您的整个堆栈跟踪,我认为在本地环境中,您的Spark作业内存不足。我建议您增加Spark驱动程序的内存并再次尝试。您正在尝试访问的文件已经发生了连接重置,这也是超时,另外,请处理IOException并更新更多的堆栈跟踪信息。 - Murtaza Kanchwala
显示剩余8条评论
2个回答

5

无论您是在独立模式还是集群模式下使用Spark,spark.driver.memoryspark.executor.memory默认为1GB内存。您可以通过更改此配置来增加driverexecutors的内存,以便在启动Jupyter笔记本电脑时或在Spark Conf文件中进行操作。这样,只要您的机器有足够的RAM,就应该能够读取3.4GB的CSV文件。


@KartikKannaput,感谢您的回答,但您能向我展示如何做到吗? - Escachator
1
您可以在shell中运行$ pyspark --executor-memory 6G --driver-memory 6G。这将为spark.driver.memory分配6GB的RAM和为spark.executor.memory分配6GB的RAM。您可以根据系统配置和应用程序要求分配不同的内存值。 - KartikKannapur

2
如在Spark文档中所述,默认的最大内存使用值为1GB。

您可以在Spark配置文件中查看默认值,如果在Linux中可能位于:

/etc/spark/conf/spark-defaults.conf

在这些行的下方。
spark.driver.memory
spark.executor.memory

但是,在应用程序中不能直接通过SparkConf设置此配置,因为此时驱动程序JVM已经启动。
在启动Spark主/从时,您应该添加内存设置参数。 参考链接
ex: ./sbin/start-master.sh --memory 2G

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接