使用Copy命令将Spark Dataframe导入PostgreSQL - Pyspark

3
我需要将一个Spark DataFrame写入到Postgres数据库中。我已经使用了以下代码:

df.write
.option("numPartitions",partions)
.option("batchsize",batchsize)
.jdbc(url=url, table="table_name", mode=append, properties=properties) 

这个工作得很好,但是我想与“复制”命令比较性能

尝试了以下操作:

output = io.StringIO() 

 csv_new.write
.format("csv")
.option("header", "true")
.save(path=output)

output.seek(0)
contents = output.getvalue()
cursor.copy_from(output, 'tb_pivot_table', null="") \\using psycopg2 
con_bb.commit() 

这似乎无法正常运行,出现了错误 'type' object is not iterable

在Pandas数据框中工作得很好

output= io.StringIO()
df.to_csv(path_or_buf=output,sep='\t', header=False, index=False)
output.seek(0)
contents = output.getvalue()
cursor.copy_from(output, 'tb_ts_devicedatacollection_aggregate', null="")  
con_bb.commit()

有没有关于如何在Pyspark中实现与Pandas相当的功能的线索。 顺便说一下:性能至关重要,因此将spark df转换为Pandas df不是一个选项。 任何帮助都将不胜感激。

3个回答

1

对我目前来说非常有效的方案(100-200GB的csv文件,大约1,000,000,000行)是使用psycopg2和多进程。

可用核心数:200

首先,我将Spark DataFrame导出为多个文件,这些文件是可用核心数的倍数。

filepath="/base_path/psql_multiprocessing_data"

df.repartition(400) \
    .write \
    .mode("overwrite") \
    .format("csv") \ # even faster using binary format, but ok with csv
    .save(filepath,header='false')

然后,我通过并行迭代文件夹中的所有文件。

import glob
import psycopg2   
from multiprocessing import Pool, cpu_count

file_path_list=sorted(glob.glob("/base_path/psql_multiprocessing_data/*.csv"))

def psql_copy_load(fileName):
    con = psycopg2.connect(database="my_db",user="my_user",password="my_password",host="my_host",port="my_port")
    cursor = con.cursor()
    with open(fileName, 'r') as f:
        # next(f)  # in case to skip the header row.
        cursor.copy_from(f, 'my_schema.my_table', sep=",")
    
    con.commit()
    con.close()
    return (fileName)
    

with Pool(cpu_count()) as p:
        p.map(psql_copy_load,file_path_list)

print("parallelism (cores): ",cpu_count())
print("files processed: ",len(file_path_list))

我没有尝试将数据进一步导出为二进制,因为正确的头部和数据类型会让事情变得复杂,而且我对运行时间感到满意,大约需要25-30分钟(有6列)。


0
据我所知,Spark没有提供内部使用copy命令的方法。
如果您想从hdfs加载postgres,您可能会对Sqoop感兴趣。它允许导出存储在hdfs上的csv。此外,它能够生成多个copy语句。在我的实验中,添加4个mappers可以将摄取速度提高2倍,而不是只有一个mapper。这应该比使用spark jdbc方式更快。
以下是步骤:
  1. df.write.csv("my/hdfs/folder")
  2. sqoop export --connect "jdbc:postgresql://postgres_host/postgres_db" --username --password-file file:///home/$USER/.password --export-dir my_csv_table --table -m 4 --direct --lines-terminated-by '\n' --fields-terminated-by ',' -- --schema


0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接