将pandas数据框保存为csv文件到gcloud存储桶上。

4
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import gc
import pandas as pd
import datetime
import numpy as np
import sys



APP_NAME = "DataFrameToCSV"

spark = SparkSession\
    .builder\
    .appName(APP_NAME)\
    .config("spark.sql.crossJoin.enabled","true")\
    .getOrCreate()

group_ids = [1,1,1,1,1,1,1,2,2,2,2,2,2,2]

dates = ["2016-04-01","2016-04-01","2016-04-01","2016-04-20","2016-04-20","2016-04-28","2016-04-28","2016-04-05","2016-04-05","2016-04-05","2016-04-05","2016-04-20","2016-04-20","2016-04-29"]

#event = [0,1,0,0,0,0,1,1,0,0,0,0,1,0]
event = [0,1,1,0,1,0,1,0,0,1,0,0,0,0]

dataFrameArr = np.column_stack((group_ids,dates,event))

df = pd.DataFrame(dataFrameArr,columns = ["group_ids","dates","event"])

以上Python代码需要在gcloud dataproc中的spark集群上运行。我想将pandas数据框保存为CSV文件,并保存在gcloud存储桶gs://mybucket/csv_data/中。如何完成这个任务?
2个回答

6

您还可以将此解决方案与Dask一起使用。 您可以将DataFrame转换为Dask DataFrame,然后将其写入Cloud Storage的csv文件。

import dask.dataframe as dd
import pandas
df # your Pandas DataFrame
ddf = dd.from_pandas(df,npartitions=1, sort=True)
ddf.to_csv('gs://YOUR_BUCKET/ddf-*.csv', index=False, sep=',', header=False,  
                               storage_options={'token': gcs.session.credentials}) 

storage_options参数是可选的。


1
你在最后一行打错了一个字母,应该是 ddf.to_csv - Avision

3
所以,我找到了如何做到这一点。继续上面的代码,以下是解决方案:
sc = SparkContext.getOrCreate()  

from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)
sparkDf = sqlCtx.createDataFrame(df)    
sparkDf.coalesce(1).write.option("header","true").csv('gs://mybucket/csv_data')

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接