使用Pyspark将多个CSV文件读入DataFrame(或RDD)

8
我有一个Spark 2.0.2集群,通过Jupyter Notebook使用Pyspark进行访问。我有多个管道分隔的txt文件(加载到HDFS中,但也可以在本地目录中使用),需要使用spark-csv加载到三个不同的数据框中,具体取决于文件名。
我看到有三种方法可以采取 - 要么使用Python迭代遍历HDFS目录(还没有想出如何做到这一点),加载每个文件,然后执行联合操作。
我还知道Spark中存在一些通配符功能(请参见here)- 我可能可以利用这个功能。
最后,我可以使用pandas将磁盘上的vanilla csv文件加载为pandas dataframe,然后创建一个Spark dataframe。这里的缺点是这些文件很大,在单个节点上加载到内存中可能需要 ~8GB。(这就是为什么首先要将其移动到集群中的原因)。
这是我到目前为止拥有的代码和两种方法的伪代码:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
import pandas as pd

sc = pyspark.SparkContext(appName = 'claims_analysis', master='spark://someIP:7077')

spark = SparkSession(sc)

#METHOD 1 - iterate over HDFS directory
for currFile in os.listdir(HDFS:///someDir//):
    if #filename contains 'claim':
        #create or unionAll to merge claim_df
    if #filename contains 'pharm':
        #create or unionAll to merge pharm_df
    if #filename contains 'service':
        #create or unionAll to merge service_df

#Method 2 - some kind of wildcard functionality
claim_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<claim>.csv')
pharm_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<pharm>.csv')
service_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<service>.csv')


#METHOD 3 - load to a pandas df and then convert to spark df
for currFile in os.listdir(HDFS:///someDir//)
    pd_df = pd.read_csv(currFile, sep = '|')
    df = spark.createDataFrame(pd_df)
    if #filename contains 'claim':
        #create or unionAll to merge claim_df
    if #filename contains 'pharm':
        #create or unionAll to merge pharm_df
    if #filename contains 'service':
        #create or unionAll to merge service_df

有人知道如何实现方法1或2吗?我一直没能搞清楚这些。另外,让我感到惊讶的是,似乎没有更好的方法将csv文件加载到pyspark数据框中 - 使用第三方包处理这种似乎应该是本地功能的事情让我感到困惑(我错过了将csv文件加载到数据框中的标准用例吗?)最终,我将编写一个合并的单个数据框返回到HDFS(使用.write.parquet()),以便我可以释放内存并使用MLlib进行一些分析。如果我强调的方法不是最佳实践,我希望得到正确方向的指引!

我认为你在第二步上走对了。你遇到了错误或者其他问题吗?你尝试的方法有什么不起作用的地方吗? - santon
我一直收到“文件未找到”错误,所以我认为问题在于我的通配符实现。其次,所有与通配符匹配的文件是否会自动合并?我对Spark通配符功能仍然有点困惑。 - flyingmeatball
是的,Spark将联合匹配通配符的所有文件中的所有记录。如果您收到文件未找到的错误,请尝试使用硬编码URI到单个文件。 - santon
2个回答

18

方法一:

在Python中,您不能直接引用HDFS位置,需要使用另一个库(如pydoop)的帮助。在Scala和Java中,您有API可用。即使使用pydoop,您也将逐个读取文件。逐个读取文件而不使用Spark提供的并行读取选项是不好的。

方法二:

您应该能够使用逗号分隔或通配符指向多个文件。这样,Spark会负责读取文件并将它们分发到分区中。但是,如果您选择与每个数据框架进行联合,则存在一种边缘情况,当您动态地读取每个文件时。当您有很多文件时,列表在Driver级别可能会变得非常巨大,并且可能会导致内存问题。主要原因是读取过程仍在Driver级别进行。

这种选项更好。Spark将读取与正则表达式相关的所有文件并将其转换为分区。您将获得所有通配符匹配的一个RDD,从而无需担心单个RDD的联合。

示例代码片段:

distFile = sc.textFile("/hdfs/path/to/folder/fixed_file_name_*.csv")

方法三:

除非您有一些使用 pandas 特性的 Python 遗留应用程序,否则我更倾向于使用 Spark 提供的 API。


1
谢谢回复 - 所以听起来你建议选项2。我不太担心文件数量,而是文件大小。通配符会自动将文件附加在一起吗?例如,如果有3个符合通配符的文件,它是否会自动将它们联合起来,还是返回3个单独的文件列表? - flyingmeatball

0

我来到这里是为了完成类似的任务。我有一个函数,它将读取HDFS并返回一个字典列表。

def get_hdfs_input_files(hdfs_input_dir):
    """Returns a dictionary object with a file list from HDFS
    :rtype: dict
    """
    import subprocess
    sub_proc_cmd = "hdfs dfs -ls " + hdfs_input_dir + " | awk '{print $8}'"
    process = subprocess.run(sub_proc_cmd, shell=True, stdout=subprocess.PIPE)
    decoded_process = process.stdout.decode('utf-8')
    file_list = decoded_process.split("\n")
    claim_list, pharma_list, service_list = [], [], []
    for file in file_list:
        if file[-4:] == 'claim':
            claim_list.append(file)
        elif file[-4:] == 'pharma':
            pharma_list.append(file)
        elif file[-3:] == 'service':
            service_list.append(file)
    ret_dict = {'claim': claim_list, 'pharma': pharma_list, 'service': service_list}
    return ret_dict

一旦您拥有CSV文件列表,您可以使用Pyspark将它们全部读入RDD中。文档指出,CSV DataFrameReader将接受“字符串或字符串列表作为输入路径,或存储CSV行的字符串RDD”。只需将文件列表传递给该方法即可。

file_list = get_hdfs_input_files('/some/hdfs/dir')
claim_df = spark.read.csv(my_list.get('claim'), 
               delimiter = '|',header ='true',nullValue ='null')  
pharma_df = spark.read.csv(my_list.get('pharma'), 
               delimiter = '|',header ='true',nullValue ='null')
service_df = spark.read.csv(my_list.get('service'), 
               delimiter = '|',header ='true',nullValue ='null')

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接