如何将DataFrame写入PostgreSQL表

179

DataFrame.to_sql方法,但它只适用于mysql、sqlite和oracle数据库。我无法将postgres连接或sqlalchemy引擎传递给此方法。

8个回答

240

从 pandas 0.14 开始(于2014年5月底发布),支持 postgresql。现在 sql 模块使用 sqlalchemy 支持不同的数据库类型。您可以为 postgresql 数据库传递一个 sqlalchemy 引擎(请参见 文档)。例如:

from sqlalchemy import create_engine
engine = create_engine('postgresql://username:password@localhost:5432/mydatabase')
df.to_sql('table_name', engine)

你说得没错,在pandas 0.13.1版本之前是不支持postgresql的。如果你需要使用旧版本的pandas,这里有一个pandas.io.sql的修补版本:https://gist.github.com/jorisvandenbossche/10841234
我写了这个很久以前的版本,不能完全保证它总是有效的,但基础应该是存在的)。如果你把该文件放在你的工作目录中并导入它,那么你应该能够执行以下操作(其中con是postgresql连接):

import sql  # the patched version (file is named sql.py)
sql.write_frame(df, 'table_name', con, flavor='postgresql')

1
这个是否已经在0.14版本中实现了? - Quant
是的,0.15版本已经发布(发行候选版)。我会更新答案,谢谢提问。 - joris
1
这篇帖子为我解决了问题:https://dev59.com/LWAf5IYBdhLWcg3w3l6l - srodriguex
2
注意:to_sql在postgres中不导出数组类型。 - Saurabh Saha
6
我可以。我需要翻译的内容是:“Instead of creating a new Sqlalchemy engine, can I use an existing Postgres connection created using psycopg2.connect()?” - Underoos
1
对于编写表格,这是不可能的。它需要一个SQLAlchemy引擎或连接。 - joris

141

更快的选项:

以下代码将比df.to_sql方法更快地将您的Pandas DF复制到postgres数据库中,您不需要任何中间csv文件来存储df。

根据您的数据库规格创建引擎。

在您的postgres数据库中创建一个表,其列数与数据框(df)相等。

DF中的数据将被插入到您的postgres表中。

from sqlalchemy import create_engine
import psycopg2 
import io

如果您想要替换该表格,我们可以使用df的标题,通过普通的to_sql方法将整个耗时较长的df加载到数据库中。

engine = create_engine(
    'postgresql+psycopg2://username:password@host:port/database')

# Drop old table and create new empty table
df.head(0).to_sql('table_name', engine, if_exists='replace',index=False)

conn = engine.raw_connection()
cur = conn.cursor()
output = io.StringIO()
df.to_csv(output, sep='\t', header=False, index=False)
output.seek(0)
contents = output.getvalue()
cur.copy_from(output, 'table_name', null="") # null values become ''
conn.commit()
cur.close()
conn.close()

5
为什么要执行 output.seek(0) 呢? - moshevi
19
这很快,以至于它很有趣 :D - Shadi
4
如果你想使用模式(schema),可以在代码的to_sql部分添加参数schema=your_schema - Jonas Palačionis
3
psycopg 版本 2.9 开始,写入特定模式的数据时,不能再使用 cur.copy_from 方法:版本 2.9 中有更改:现在表和字段名已经被引用。如果您需要指定带架构限定符的表,请使用 copy_expert()。以下是使用 copy_expert 的示例:cur.copy_expert('COPY schema_name.table_name FROM STDIN', output) - Alexandre Léonard
9
3年后,我又来到了这里... ¯\(ツ) - Shadi
显示剩余15条评论

51

Pandas 0.24.0+ 解决方案

在Pandas 0.24.0中引入了一个新的功能,专门为快速写入Postgres而设计。您可以在此处了解更多信息:https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-sql-method

import csv
from io import StringIO

from sqlalchemy import create_engine

def psql_insert_copy(table, conn, keys, data_iter):
    # gets a DBAPI connection that can provide a cursor
    dbapi_conn = conn.connection
    with dbapi_conn.cursor() as cur:
        s_buf = StringIO()
        writer = csv.writer(s_buf)
        writer.writerows(data_iter)
        s_buf.seek(0)

        columns = ', '.join('"{}"'.format(k) for k in keys)
        if table.schema:
            table_name = '{}.{}'.format(table.schema, table.name)
        else:
            table_name = table.name

        sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
            table_name, columns)
        cur.copy_expert(sql=sql, file=s_buf)

engine = create_engine('postgresql://myusername:mypassword@myhost:5432/mydatabase')
df.to_sql('table_name', engine, method=psql_insert_copy)

4
大部分情况下,添加 method='multi' 选项就足够快了。但是,确实这个 COPY 方法是目前最快的方式。 - ssword
我尝试使用这种方法,但是它抛出了一个错误:“表'XYZ'已经存在”。据我所知,它不应该创建一个表,对吗? - E. Epstein
1
@E.Epstein - 你可以修改最后一行为df.to_sql('table_name', engine, if_exists='replace', method=psql_insert_copy) - 这将在你的数据库中创建一个表。 - mgoldwasser
这个版本比@Aseem的版本更快吗? - ciurlaro
你可能需要在表名代码周围添加引号,否则它无法处理表名中的破折号。 sql = 'COPY "{}" ({}) FROM STDIN WITH CSV'.format(table_name, columns) - Danferno
显示剩余4条评论

43

这是我完成它的方式。

使用execute_batch可能会更快:

# df is the dataframe
if len(df) > 0:
    df_columns = list(df)
    # create (col1,col2,...)
    columns = ",".join(df_columns)

    # create VALUES('%s', '%s",...) one '%s' per column
    values = "VALUES({})".format(",".join(["%s" for _ in df_columns])) 

    #create INSERT INTO table (columns) VALUES('%s',...)
    insert_stmt = "INSERT INTO {} ({}) {}".format(table,columns,values)

    cur = conn.cursor()
    psycopg2.extras.execute_batch(cur, insert_stmt, df.values)
    conn.commit()
    cur.close()

2
我得到了AttributeError:模块“psycopg2”没有属性“extras”。这需要明确导入。import psycopg2.extras. - GeorgeLPerkins
这个函数比SQLAlchemy的解决方案快得多。 - Saurabh Saha
这似乎无法正确处理np.nan。如果您使用上述代码,则很可能在数据库中看到“NaN”字符串,而不是空值。 - Jeong Kim

1

以自定义架构的方式,带/不带索引快速将df写入表格的方法:

"""
Faster way to write df to table.
Slower way is to use df.to_sql()
"""

from io import StringIO

from pandas import DataFrame
from sqlalchemy.engine.base import Engine


class WriteDfToTableWithIndexMixin:
    @classmethod
    def write_df_to_table_with_index(
            cls,
            df: DataFrame,
            table_name: str,
            schema_name: str,
            engine: Engine
    ):
        """
        Truncate existing table and load df into table.
        Keep each column as string to avoid datatype conflicts.
        """
        df.head(0).to_sql(table_name, engine, if_exists='replace',
                          schema=schema_name, index=True, index_label='id')

        conn = engine.raw_connection()
        cur = conn.cursor()
        output = StringIO()
        df.to_csv(output, sep='\t', header=False,
                  index=True, index_label='id')
        output.seek(0)
        contents = output.getvalue()
        cur.copy_expert(f"COPY {schema_name}.{table_name} FROM STDIN", output)
        conn.commit()


class WriteDfToTableWithoutIndexMixin:
    @classmethod
    def write_df_to_table_without_index(
            cls,
            df: DataFrame,
            table_name: str,
            schema_name: str,
            engine: Engine
    ):
        """
        Truncate existing table and load df into table.
        Keep each column as string to avoid datatype conflicts.
        """
        df.head(0).to_sql(table_name, engine, if_exists='replace',
                          schema=schema_name, index=False)

        conn = engine.raw_connection()
        cur = conn.cursor()
        output = StringIO()
        df.to_csv(output, sep='\t', header=False, index=False)
        output.seek(0)
        contents = output.getvalue()
        cur.copy_expert(f"COPY {schema_name}.{table_name} FROM STDIN", output)
        conn.commit()

如果您的df中有列中的JSON值,则上述方法仍将正确加载所有数据,但json列将具有一些奇怪的格式。因此,将该JSON列转换为::json可能会生成错误。您必须使用to_sql()。添加method=multi以加快速度,并添加chunksize以防止机器冻结:

df.to_sql(table_name, engine, if_exists='replace', schema=schema_name, index=False, method='multi', chunksize=1000)

0
使用psycopg2,您可以使用本地SQL命令将数据写入Postgres表中。
import psycopg2
import pandas as pd

conn = psycopg2.connect("dbname='{db}' user='{user}' host='{host}' port='{port}' password='{passwd}'".format(
            user=pg_user,
            passwd=pg_pass,
            host=pg_host,
            port=pg_port,
            db=pg_db))
cur = conn.cursor()    
def insertIntoTable(df, table):
        """
        Using cursor.executemany() to insert the dataframe
        """
        # Create a list of tupples from the dataframe values
        tuples = list(set([tuple(x) for x in df.to_numpy()]))
    
        # Comma-separated dataframe columns
        cols = ','.join(list(df.columns))
        # SQL query to execute
        query = "INSERT INTO %s(%s) VALUES(%%s,%%s,%%s,%%s)" % (
            table, cols)
    
        try:
            cur.executemany(query, tuples)
            conn.commit()

        except (Exception, psycopg2.DatabaseError) as error:
            print("Error: %s" % error)
            conn.rollback()
            return 1

5
一个好的回答总是会包含解释为什么这样做可以解决问题,以便原帖作者和任何未来的读者都能从中学习。 - Tyler2P

-1
创建引擎(其中方言为'postgres'或'mysql'等):
from sqlalchemy import create_engine
engine = create_engine(f'{dialect}://{user_name}@{host}:{port}/{db_name}')
Session = sessionmaker(bind=engine) 

with Session() as session:
    df = pd.read_csv(path + f'/{file}') 
    df.to_sql('table_name', con=engine, if_exists='append',index=False)

它适用于大多数数据库,包括postgres。您必须在engine = create_engine(dialect ='postgres'等...)中指定方言。 - David

-1

适用于Python 2.7和Pandas 0.24.2并使用Psycopg2

Psycopg2连接模块

def dbConnect (db_parm, username_parm, host_parm, pw_parm):
    # Parse in connection information
    credentials = {'host': host_parm, 'database': db_parm, 'user': username_parm, 'password': pw_parm}
    conn = psycopg2.connect(**credentials)
    conn.autocommit = True  # auto-commit each entry to the database
    conn.cursor_factory = RealDictCursor
    cur = conn.cursor()
    print ("Connected Successfully to DB: " + str(db_parm) + "@" + str(host_parm))
    return conn, cur

连接到数据库

conn, cur = dbConnect(databaseName, dbUser, dbHost, dbPwd)

假设数据框已经存在,命名为df

output = io.BytesIO() # For Python3 use StringIO
df.to_csv(output, sep='\t', header=True, index=False)
output.seek(0) # Required for rewinding the String object
copy_query = "COPY mem_info FROM STDOUT csv DELIMITER '\t' NULL ''  ESCAPE '\\' HEADER "  # Replace your table name in place of mem_info
cur.copy_expert(copy_query, output)
conn.commit()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接