使用Pyspark同时读取多个CSV文件

17

我正在使用SPARK读取HDFS中的文件。有一种情况,我们从旧系统以CSV格式获取文件的块。

ID1_FILENAMEA_1.csv
ID1_FILENAMEA_2.csv
ID1_FILENAMEA_3.csv
ID1_FILENAMEA_4.csv
ID2_FILENAMEA_1.csv
ID2_FILENAMEA_2.csv
ID2_FILENAMEA_3.csv

使用HiveWareHouse Connector将这些文件加载到HIVE中的FILENAMEA,进行一些转换,比如添加默认值。类似地,我们有大约70个表。Hive表以ORC格式创建。表按ID分区。目前,我正在逐个处理所有这些文件。这需要很多时间。

我希望能够加快这个过程。这些文件将达到几个GB。

是否有任何方法可以同时读取所有FILENAMEA文件并将其加载到HIVE表中。


你提到了70个表,这些CSV文件的模式都相同吗?所有文件都在同一个目录中吗?如果是,您需要读取该目录中的所有文件还是只需读取其中一些文件?您能发布您当前正在使用的代码吗(仅包括读取和写入部分,不包括转换)?先感谢您! - Vincent Doba
@VincentDoba:感谢您的回复。每个表都有唯一的模式。是的,所有文件都在同一个目录中。我必须读取所有文件。我正在使用spark.read.csv(filename)。toDF(columns)函数。 - Raja
2个回答

24

您有两种方法在pyspark中读取多个CSV文件。如果所有CSV文件都在同一个目录中,并且具有相同的模式,您可以通过直接将目录路径作为参数传递并一次性读取它们来读取它们,如下所示:

spark.read.csv('hdfs://path/to/directory')

如果您有位于不同位置的 CSV 文件,或者在同一目录中但与其他 CSV/文本文件混在一起的 CSV 文件,您可以将它们作为表示路径列表的字符串传递给.csv()方法参数,如下所示:

spark.read.csv('hdfs://path/to/filename1,hdfs://path/to/filename2')

你可以在这里了解有关如何使用Spark读取CSV文件的更多信息。

如果你需要从HDFS目录下的文件列表构建此路径列表,你可以查看此答案,一旦你创建了路径列表,你可以将其转换为字符串以通过.csv()方法传递,方法是使用','.join(your_file_list)


谢谢。是否有任何方法可以使用通配符读取所有csv文件?例如spark.read.csv('hdfs://path/to/FILENAMEA') - Raja
不,这样行不通,spark.read 只接受字面文件路径。你需要先在普通的 Python 中列出并过滤文件列表,然后将其作为字符串参数传递给 spark.read.csv() - Vincent Doba
2
是的,从性能方面考虑,继续使用是很好的选择。Spark 就是为这种用例而设计的。 - Vincent Doba
1
我不认为有任何限制,但你应该测试一下。 - Vincent Doba
2
在Spark 2.0中,这不起作用。对于Spark 2.0,您需要执行以下操作:spark.read.csv(['hdfs://path/to/filename1','hdfs://path/to/filename2']) - Pyaive Oleg
显示剩余4条评论

2

使用spark.read.csv(["path1","path2","path3"...]),您可以从不同路径读取多个文件。但这意味着您首先必须制作一个路径列表,而不是逗号分隔的文件路径字符串。


对于多个文件,我发现这是唯一适用于我自己的解决方案,使用PySpark、Python和Java,都是通过Anaconda在Windows 10上安装的。我在网上找到了其他代码模式,但它们对我来说都不起作用,例如,一个逗号分隔的路径字符串,多个路径作为csv()方法的单独未命名参数。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接