Pyspark - 加载文件:路径不存在

26

我是Spark的新手。我正在尝试在EMR集群中读取本地CSV文件。该文件位于:/home/hadoop/. 我使用的脚本如下:

spark = SparkSession \
    .builder \
    .appName("Protob Conversion to Parquet") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()\

df = spark.read.csv('/home/hadoop/observations_temp.csv, header=True)

当我运行脚本时,会出现以下错误信息:

pyspark.sql.utils.AnalysisException: u'Path does not exist: hdfs://ip-172-31-39-54.eu-west-1.compute.internal:8020/home/hadoop/observations_temp.csv

然后我发现我必须在文件路径中添加file://,这样它才能在本地读取文件:

df = spark.read.csv('file:///home/hadoop/observations_temp.csv, header=True)

但是这一次,上述方法出现了不同的错误:

在第0.0阶段丢失了任务0.3(TID 3,
ip-172-31-41-81.eu-west-1.compute.internal,执行器1): java.io.FileNotFoundException: 文件 file:/home/hadoop/observations_temp.csv不存在

我认为这是因为file://扩展名仅在本地读取文件,而不会将文件分布到其他节点。

你知道怎样可以读取csv文件并使其对所有其他节点可用吗?


另外,我发现将数据文件存储在S3中可以使生活变得更简单,一旦您授予集群访问您的存储桶。我知道这并没有直接解决您的问题,但还是想提一下。 - ImDarrenG
你是如何下载该文件的? - ImDarrenG
我使用cli os.system("aws s3 cp "s3://raw_data/files/observation.protob /home/hadoop/mount_point/s3))下载文件。由于文件大小,我将其下载到不同的卷中。从那里,我可以读取它并在/home/hadoop/中生成输出文件。 - ultraInstinct
你的集群管理器是什么?Spark独立模式还是YARN? - mrsrinivas
相关内容:https://dev59.com/elkT5IYBdhLWcg3wPdKU - duplex143
显示剩余2条评论
3个回答

28

您说得对,您的文件确实缺失于工作节点,因此导致了您遇到的错误。

这里是官方文档Ref. External Datasets

如果使用本地文件系统上的路径,文件必须在工作节点上的相同路径下也可访问。可以将文件复制到所有工作节点或使用网络挂载的共享文件系统。

因此,您基本上有两个解决方案:

在启动作业之前将文件复制到每个工作节点;

或者您可以像这样上传到HDFS:(推荐的解决方案)

hadoop fs -put localfile /user/hadoop/hadoopfile.csv

现在您可以使用以下方式阅读:

df = spark.read.csv('/user/hadoop/hadoopfile.csv', header=True)

看起来你也在使用AWS S3,你可以尝试直接从S3读取它,而不需要下载它。(当然要使用正确的凭证)

有些人建议使用spark-submit提供的--files标签将文件上传到执行目录。除非你的csv文件非常小,否则我不建议使用这种方法,但此时你也不需要Spark。

或者,我会坚持使用HDFS(或任何分布式文件系统)。


由于该答案现在已经两年了,我想知道是否有关于这个问题的任何更新? 这很奇怪和便宜,apache 不能让spark随机访问文件。我的意思是,什么样的分析引擎是那样的,甚至不能正确地获得对文件的访问权限? - Amir
谢谢!我使用master=local[4]来运行它,而不是使用集群,因此我不需要将文件分发到机器上或将其放入Hadoop中。顺便说一句,如果您需要一个集群来处理文件,则表示您需要一个分布式文件系统,并且应该将文件放入其中。大多数情况下,将文件分发到工作节点是不可行的。 - Onur Demir

5

我认为你缺少的是在初始化SparkSession时明确设置主节点,可以尝试像这样:

spark = SparkSession \
    .builder \
    .master("local") \
    .appName("Protob Conversion to Parquet") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

然后以与您一直在使用的方式读取文件

df = spark.read.csv('file:///home/hadoop/observations_temp.csv')

这应该解决了问题...


你能给一个例子吗? - lightbox142

0

对于在Mac上使用Docker运行Zeppelin的人可能会有用。

  1. 将文件复制到自定义文件夹:/Users/my_user/zeppspark/myjson.txt

  2. docker run -p 8080:8080 -v /Users/my_user/zeppspark:/zeppelin/notebook --rm --name zeppelin apache/zeppelin:0.9.0

  3. 在Zeppelin上,您可以运行以下命令来获取您的文件:

%pyspark

json_data = sc.textFile('/zeppelin/notebook/myjson.txt')


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接