在Pyspark中读取多个目录下的parquet文件

32

我需要从不是父目录或子目录的多个路径中读取parquet文件。

例如,

dir1 ---
       |
       ------- dir1_1
       |
       ------- dir1_2
dir2 ---
       |
       ------- dir2_1
       |
       ------- dir2_2

sqlContext.read.parquet(dir1) 从 dir1_1 和 dir1_2 读取 Parquet 文件

目前我正在读取每个目录并使用 "unionAll" 合并数据框。是否有一种不使用 unionAll 读取 dir1_2 和 dir2_1 中的 Parquet 文件的方法,或者是否有任何花哨的方法可以使用 unionAll

谢谢


嗨,我有一个类似的任务需要读取多个Json文件,但是这里提供的代码都不起作用:( 你找到解决方案了吗? - zhifff
6个回答

64

虽然晚了一点,但我在搜索时找到了这个,它可能会帮助其他人...

你也可以尝试将参数列表拆开到spark.read.parquet()中。

paths=['foo','bar']
df=spark.read.parquet(*paths)

如果您想将几个 Blob 传递到路径参数中,这样做非常方便:

basePath='s3://bucket/'
paths=['s3://bucket/partition_value1=*/partition_value2=2017-04-*',
       's3://bucket/partition_value1=*/partition_value2=2017-05-*'
      ]
df=spark.read.option("basePath",basePath).parquet(*paths)

这很酷,因为你不需要列出basePath中的所有文件,仍然可以获得分区推断。


当我仅使用此代码时,它会在/home/目录中搜索目录,你能否请发布整个语法? - Viv
@N00b 当我尝试这段代码时,它会给我一个错误,说load只需要4个参数,但是我有24个文件的路径.. 是否有覆盖此限制的选项。我不想进行多个加载和联合,这就是为什么我想使用load将多个文件放入df中的原因。 - E B
对我来说完美运行!@EB,你是否将其保存为列表,然后作为表达式运行(*paths) - thentangler
你需要这个“*”吗?当我使用它时,会返回错误。 - undefined

14

如果您有一个文件列表,可以执行以下操作:

files = ['file1', 'file2',...]
df = spark.read.parquet(*files)

*是关键。当你处理生成目录列表的程序时,这是从该列表变量中读取值的最合适方式。 - Alex Raj Kaliamoorthy

13

SQLContextparquetFile方法和DataFrameReaderparquet方法都可以接受多个路径作为参数。因此,以下两种方式都是有效的:

df = sqlContext.parquetFile('/dir1/dir1_2', '/dir2/dir2_1')
或者
df = sqlContext.read.parquet('/dir1/dir1_2', '/dir2/dir2_1')

这些都对我没用。它会找到“可疑路径”,然后给我一个很长的Java清单。 - mic
我也遇到了同样的问题。你需要添加一个选项: .option("basePath", "file:///your/path/")就像这个答案中所示:https://dev59.com/xlwX5IYBdhLWcg3wlgPl#33656595 - AssafR

5

关于ORC技术

spark.read.orc("/dir1/*","/dir2/*")

Spark进入dir1/和dir2/文件夹并加载所有ORC文件。

对于Parquet文件,

spark.read.parquet("/dir1/*","/dir2/*")

4

我参考了John Conley的答案,并进行了一些补充,提供完整代码(在Jupyter PySpark中使用),因为我发现他的答案非常有用。

from hdfs import InsecureClient
client = InsecureClient('http://localhost:50070')

import posixpath as psp
fpaths = [
  psp.join("hdfs://localhost:9000" + dpath, fname)
  for dpath, _, fnames in client.walk('/eta/myHdfsPath')
  for fname in fnames
]
# At this point fpaths contains all hdfs files 

parquetFile = sqlContext.read.parquet(*fpaths)


import pandas
pdf = parquetFile.toPandas()
# display the contents nicely formatted.
pdf

0
在Spark-Scala中,你可以这样做。
val df = spark.read.option("header","true").option("basePath", "s3://bucket/").csv("s3://bucket/{sub-dir1,sub-dir2}/")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接