我有一个Spark 2.0.2集群,通过Jupyter Notebook使用Pyspark进行访问。我有多个管道分隔的txt文件(加载到HDFS中,但也可以在本地目录中使用),需要使用spark-csv加载到三个不同的数据框中,具体取决于文件名。
我看到有三种方法可以采取 - 要么使用Python迭代遍历HDFS目录(还没有想出如何做到这一点),加载每个文件,然后执行联合操作。
我还知道Spark中存在一些通配符功能(请参见here)- 我可能可以利用这个功能。
最后,我可以使用pandas将磁盘上的vanilla csv文件加载为pandas dataframe,然后创建一个Spark dataframe。这里的缺点是这些文件很大,在单个节点上加载到内存中可能需要 ~8GB。(这就是为什么首先要将其移动到集群中的原因)。
这是我到目前为止拥有的代码和两种方法的伪代码:
有人知道如何实现方法1或2吗?我一直没能搞清楚这些。另外,让我感到惊讶的是,似乎没有更好的方法将csv文件加载到pyspark数据框中 - 使用第三方包处理这种似乎应该是本地功能的事情让我感到困惑(我错过了将csv文件加载到数据框中的标准用例吗?)最终,我将编写一个合并的单个数据框返回到HDFS(使用.write.parquet()),以便我可以释放内存并使用MLlib进行一些分析。如果我强调的方法不是最佳实践,我希望得到正确方向的指引!
我看到有三种方法可以采取 - 要么使用Python迭代遍历HDFS目录(还没有想出如何做到这一点),加载每个文件,然后执行联合操作。
我还知道Spark中存在一些通配符功能(请参见here)- 我可能可以利用这个功能。
最后,我可以使用pandas将磁盘上的vanilla csv文件加载为pandas dataframe,然后创建一个Spark dataframe。这里的缺点是这些文件很大,在单个节点上加载到内存中可能需要 ~8GB。(这就是为什么首先要将其移动到集群中的原因)。
这是我到目前为止拥有的代码和两种方法的伪代码:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
sc = pyspark.SparkContext(appName = 'claims_analysis', master='spark://someIP:7077')
spark = SparkSession(sc)
#METHOD 1 - iterate over HDFS directory
for currFile in os.listdir(HDFS:///someDir//):
if #filename contains 'claim':
#create or unionAll to merge claim_df
if #filename contains 'pharm':
#create or unionAll to merge pharm_df
if #filename contains 'service':
#create or unionAll to merge service_df
#Method 2 - some kind of wildcard functionality
claim_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<claim>.csv')
pharm_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<pharm>.csv')
service_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<service>.csv')
#METHOD 3 - load to a pandas df and then convert to spark df
for currFile in os.listdir(HDFS:///someDir//)
pd_df = pd.read_csv(currFile, sep = '|')
df = spark.createDataFrame(pd_df)
if #filename contains 'claim':
#create or unionAll to merge claim_df
if #filename contains 'pharm':
#create or unionAll to merge pharm_df
if #filename contains 'service':
#create or unionAll to merge service_df
有人知道如何实现方法1或2吗?我一直没能搞清楚这些。另外,让我感到惊讶的是,似乎没有更好的方法将csv文件加载到pyspark数据框中 - 使用第三方包处理这种似乎应该是本地功能的事情让我感到困惑(我错过了将csv文件加载到数据框中的标准用例吗?)最终,我将编写一个合并的单个数据框返回到HDFS(使用.write.parquet()),以便我可以释放内存并使用MLlib进行一些分析。如果我强调的方法不是最佳实践,我希望得到正确方向的指引!