psycopg2中的COPY命令

3
我有一个流程,需要从四个数据库的每个四个表格中读取数据。我将这些数据合并到一个总共有四个表格的Postgres数据库中(原始的四个数据库都有相同需要合并的四个表格)。
我现在使用pandas来完成这个流程。我一次从所有四个数据库中读取一个表格,将数据合并到一个数据帧中,然后使用to_sql将其保存到我的Postgres数据库中。然后,我循环遍历其他表格,并对其余的数据库执行相同的操作。
我的问题是速度。我的其中一个表格每天有大约1-2百万行的数据,因此需要大约5,000-6,000秒才能将数据写入Postgres。将数据写入.csv文件,然后在pgadmin中使用COPY FROM会更快。
以下是我的当前代码。请注意,其中有一些函数调用,但基本上只涉及表格名称。我还进行了一些基本日志记录,但这并不是太必要。我添加了一个源数据库的列,这是必需的。我从实际上是字符串的字段中去掉.0,但是pandas也将它们视为浮点数,并用0填充空整数,并确保列真正是int类型。
def query_database(table, table_name, query_date):
    df_list = []
    log_list = []
    for db in ['NJ', 'NJ2', 'LA', 'NA']:
        start_time = time.clock()
        query_timestamp = dt.datetime.now(pytz.timezone('UTC')).strftime('%Y-%m-%d %H:%M:%S')
        engine_name = '{}{}{}{}'.format(connection_type, server_name, '/', db)
        print('Accessing {} from {}'.format((select_database(db)[0][table]), engine_name))
        engine = create_engine(engine_name)
        df = pd.read_sql_query(query.format(select_database(db)[0][table]), engine, params={query_date})
        query_end = time.clock() - start_time
        df['source_database'] = db
        df['insert_date_utc'] = query_timestamp
        df['row_count'] = df.shape[0]
        df['column_count'] = df.shape[1]
        df['query_time'] = round(query_end, 0)
        df['maximum_id'] = df['Id'].max()
        df['minimum_id'] = df['Id'].min()
        df['source_table'] = table_dict.get(table)
        log = df[['insert_date_utc', 'row_date', 'source_database', 'source_table', 'row_count', 'column_count', 'query_time', 'maximum_id', 'minimum_id']].copy()
        df.drop(['row_count', 'column_count', 'query_time', 'maximum_id', 'minimum_id', 'source_table'], inplace=True, axis=1)
        df_list.append(df)
        log_list.append(log)
    log = pd.concat(log_list)
    log.drop_duplicates(subset=['row_date', 'source_database', 'source_table'], inplace=True, keep='last')
    result = pd.concat(df_list)
    result.drop_duplicates('Id', inplace=True)
    cols = [i.strip() for i in (create_columns(select_database(db)[0][table]))]
    result = result[cols]
    print('Creating string columns for {}'.format(table_name))
    for col in modify_str_cols(select_database(db)[0][table]):
        create_string(result, col)
    print('Creating integer columns for {}'.format(table_name))
    for col in modify_int_cols(select_database(db)[0][table]):
        create_int(result, col)
    log.to_sql('raw_query_log', cms_dtypes.pg_engine, index=False, if_exists='append', dtype=cms_dtypes.log_dtypes)
    print('Inserting {} data into PostgreSQL'.format(table_name))
    result.to_sql(create_table(select_database(db)[0][table]), cms_dtypes.pg_engine, index=False, if_exists='append', chunksize=50000, dtype=create_dtypes(select_database(db)[0][table]))

如何在这个过程中插入COPY TO和COPY FROM以加快速度?我应该只是编写.csv文件,然后循环处理那些文件,还是可以从内存中将数据COPY到我的postgres数据库中?

1个回答

3

psycopg2提供了许多与copy相关的API。如果您想使用csv,则必须使用copy_expert(它允许您指定完整的copy语句)。

通常情况下,我会使用copy_expert()和类似于文件的对象来遍历磁盘上的文件。这似乎运行得相当不错。

尽管如此,在您的情况下,我认为copy_tocopy_from更匹配,因为这只是从Postgres到Postgres的传输。请注意,这些使用PostgreSQL的复制输出/输入语法而不是csv(如果要使用csv,则必须使用copy_expert

在做决策之前,请注意以下内容:

copy_to将内容复制到文件样式对象(例如StringIO),而copy_from/copy_expert从文件样式对象中读取数据。如果您想使用Panda数据框,则需要考虑一下,要么创建一个文件样式对象,要么使用csv以及StringIOcopy_expert来生成内存中的csv并加载它。


澄清一下,我正在合并的数据库来自MS SQL Server。不确定这是否有所不同。如果可以避免编写文件,那将是最好的。COPY命令能否接受pandas DataFrame?我正在向数据添加列并指定数据类型/填充空白。或者我应该先编写一个临时文件? - trench
是的,您需要提供一个“类似文件的对象”,但它可以从内存中提供数据。 - Chris Travers
所以在这一行之前,我可以保持所有的东西都不变:result.to_sql(create_table(select_database(db)[0][table]), cms_dtypes.pg_engine, index=False, if_exists='append', chunksize=50000, dtype=create_dtypes(select_database(db)[0][table]))这是我最终的内存数据,准备保存到数据库中。你能提供一个示例代码来写入我的表格吗?是用copy_to吗?如何确保我的标题不被包含?engine.copy_to('table_name', result) - trench
请查看 psycopg2 和 copy_to/copy_from 的文档:http://initd.org/psycopg/docs/cursor.html - Chris Travers
是的,我做过。只是不太清楚。没有一个例子真正展示了它如何在内存中与pandas dataframe一起使用。 - trench
请查看我最新添加的内容。这不是一个示例,而是至少关于可能方向的讨论。 - Chris Travers

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接