如何在sc.textFile中加载本地文件,而不是HDFS

121
我在学习优秀的 spark教程,现在尝试在46分处加载README.md文件,但是失败了。我的操作如下:
$ sudo docker run -i -t -h sandbox sequenceiq/spark:1.1.0 /etc/bootstrap.sh -bash
bash-4.1# cd /usr/local/spark-1.1.0-bin-hadoop2.4
bash-4.1# ls README.md
README.md
bash-4.1# ./bin/spark-shell
scala> val f = sc.textFile("README.md")
14/12/04 12:11:14 INFO storage.MemoryStore: ensureFreeSpace(164073) called with curMem=0, maxMem=278302556
14/12/04 12:11:14 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 160.2 KB, free 265.3 MB)
f: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[1] at textFile at <console>:12
scala> val wc = f.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://sandbox:9000/user/root/README.md
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)

我该如何加载那个README.md文件?
15个回答

200

尝试明确指定sc.textFile(“file:///path to the file /”)。当设置Hadoop环境时,会出现错误。

SparkContext.textFile在内部调用org.apache.hadoop.mapred.FileInputFormat.getSplits,后者如果没有模式则使用org.apache.hadoop.fs.getDefaultUri。此方法读取Hadoop conf的“fs.defaultFS”参数。如果您设置了HADOOP_CONF_DIR环境变量,则该参数通常设置为“hdfs:// ...”; 否则为“file://”。


当我在Windows上指定路径时,为什么"file:///C:\\Xiang\\inputfile""file:////C:\\Xiang\\inputfile"都可以工作,而"file://C:\\Xiang\\inputfile"在Java代码中无法工作。在Linux上呢?前缀应该是file:///(三个斜杠)还是file:////(四个斜杠)?file:////在Linux上也可以工作吗? - XYZ
我检查了源代码,发现它是static final URI NAME = URI.create("file:///");,因此我认为应该将其硬编码为带有三个斜杠的前缀file:///。但我仍然不明白为什么file:////(四个斜杠)也可以工作。 - XYZ
@YuXiang 你想在GitHub源代码的某一行添加链接吗? - suztomo
@suztomo,它在这里:https://hadoop.apache.org/docs/r2.7.4/api/src-html/org/apache/hadoop/fs/RawLocalFileSystem.html - XYZ
@suztomo,我的错误信息:`java.lang.IllegalArgumentException: Wrong FS:file://C:\Xiang\cs_hdfs\csByDate\20190822/C:/Xiang/323Bit/bigfoot,expected: file:/// at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645)at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:80) at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:534) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752)` - XYZ
显示剩余4条评论

26

gonbe的回答非常好。但我还想提醒一下,file:/// = ~/../../,而不是$SPARK_HOME。希望这能为像我这样的新手节省一些时间。


10
file:/// 是执行JVM所看到的文件系统的根目录,而不是在家庭文件夹上方两个级别。根据RFC 8089中指定的URI格式为 file://hostname/absolute/path。在本地情况下,“hostname”(权限)组件为空。 - Hristo Iliev

22
如果文件位于您的Spark主节点上(例如,在使用AWS EMR的情况下),则首先在本地模式下启动spark-shell。
$ spark-shell --master=local
scala> val df = spark.read.json("file:///usr/lib/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

或者,您可以先将文件从本地文件系统复制到HDFS,然后在其默认模式下启动Spark(例如,在使用AWS EMR的情况下为YARN),直接读取该文件。

$ hdfs dfs -mkdir -p /hdfs/spark/examples
$ hadoop fs -put /usr/lib/spark/examples/src/main/resources/people.json /hdfs/spark/examples
$ hadoop fs -ls /hdfs/spark/examples
Found 1 items
-rw-r--r--   1 hadoop hadoop         73 2017-05-01 00:49 /hdfs/spark/examples/people.json

$ spark-shell
scala> val df = spark.read.json("/hdfs/spark/examples/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

唯一一个告诉你如何在本地模式下开始的答案。这个需要更多的赞。 - Ted

22

尽管Spark支持从本地文件系统加载文件,但要求所有节点上的文件路径相同。

一些网络文件系统(如NFS、AFS和MapR的NFS层)对用户来说就像一个常规文件系统一样。

如果您的数据已经存储在这些系统之一中,则只需指定file://路径即可将其用作输入;只要每个节点上挂载的文件系统路径相同,Spark就会处理它。每个节点都需要有相同的路径。

 rdd = sc.textFile("file:///path/to/file")

如果您的文件尚未在集群中所有节点上,请在驱动程序上本地加载它,而无需通过Spark,并调用parallelize将其分发给工作节点。

请注意在文件路径前添加file://,并根据操作系统使用"/"或"\"。


1
Spark有没有一种自动将其$SPARK_HOME目录中的数据复制到所有计算节点的方式?还是需要手动操作? - Matthias
Spark源代码处理不同文件系统格式的位置在哪里? - Saher Ahwal

19

注意:

当从本地加载数据(sc.textFile("file:///path to the file/"))时,请确保在本地模式下运行Spark,否则您将会遇到如下错误:Caused by: java.io.FileNotFoundException: File file:/data/sparkjob/config2.properties does not exist。因为在不同的工作节点上运行的执行器将无法在其本地路径中找到此文件。


1
我们可以在本地文件驱动程序上运行Spark独立模式(驱动程序在一个节点上,执行器在其他节点上)吗?还是我应该在所有节点上都有本地文件? - sherminator35

14
"file:///directory/file" 是指定文件路径的格式,只需将路径替换为实际文件所在目录和文件名即可。
例如:
val textFile = sc.textFile("file:///usr/local/spark/README.md")

10

我在桌面上有一个名为NewsArticle.txt的文件。

在Spark中,我输入了:

val textFile= sc.textFile(“file:///C:/Users/582767/Desktop/NewsArticle.txt”)

我需要将文件路径中的所有 \ 字符更改为 /。

为了测试它是否起作用,我输入了:

textFile.foreach(println)

我正在使用Windows 7,但没有安装Hadoop。


6

我曾经在安装了Hadoop的公共用户主目录下,使用Spark 2.3时遇到过这个问题。由于Spark和Hadoop都是在同一个公共目录下安装的,因此Spark默认将方案视为hdfs,并开始在Hadoop的core-site.xml中指定的hdfs下查找输入文件。在这种情况下,我们需要明确地将方案指定为file:///<absoloute path to file>


5
这个问题已经在Spark邮件列表中讨论过了,请参考这封邮件:邮件
你应该使用 hadoop fs -put <localsrc> ... <dst> 命令将文件复制到 hdfs 中。
${HADOOP_COMMON_HOME}/bin/hadoop fs -put /path/to/README.md README.md

1
我尝试了以下内容,并且在我的本地文件系统中成功运行。基本上,Spark 可以从本地、HDFS 和 AWS S3 路径读取。
listrdd=sc.textFile("file:////home/cloudera/Downloads/master-data/retail_db/products")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接