云SQL - Postgres - 插入速度非常慢

3
我可能习惯使用大数据技术,但当我尝试插入约300k行的总计30MB(作为csv文件)时,我认为在Postgres上花费15分钟进行插入是不可接受的。
我首先了解到,在GCP上增加总磁盘大小也会增加IOPS,因此我将磁盘从20GB升级到了400GB。
然后在互联网上查找后,我发现了这篇文章:https://naysan.ca/2020/05/09/pandas-to-postgresql-using-psycopg2-bulk-insert-performance-benchmark/ 我之前使用旧的df.to_sql()来插入我的数据(我太疯狂了,对吧!)。我记得MSSQL Driver有一个类似于fast_executemany的参数,但没有适用于psycopg2的内容。
因此,我尝试了该文中更快的方法,通过将df复制到内存中并写入数据库。通过所有这些升级(磁盘大小+代码优化),我将运行时间从30/35分钟缩短到了约15分钟。
改进了!但仍然需要大约1000秒。
这里是我正在做的示例:
import pandas as pd
import psycopg2
import time
from io import StringIO


if __name__ == '__main__':
    startTime = time.time()
    df = pd.read_gbq(
        "SELECT * FROM dataset.defaut",
        project_id="my_id_12345",
        location="europe-west1"
    )

    executionTime = (time.time() - startTime)
    print('Execution GBQ Read time in seconds: ' + str(executionTime))

    df_qgis = df[
        ["voie", "direction", "date_mesure",
         "dfo_id", "pk_ref", "type_defaut",
         "niveau_ref", "val_ref", "longueur",
         "pk_debut", "pk_fin", "pk", "pk_original"]]

    param_dic = {
        "host": "1.2.3.4",
        "database": "qgis",
        "user": "user",
        "password": "y0lo123"
    }


    def connect(params_dic):
        """ Connect to the PostgreSQL database server """
        conn = None
        try:
            # connect to the PostgreSQL server
            print('Connecting to the PostgreSQL database...')
            conn = psycopg2.connect(**params_dic)
        except (Exception, psycopg2.DatabaseError) as error:
            print(error)
            exit(1)
        print("Connection successful")
        return conn


    conn = connect(param_dic)


    def copy_from_stringio(conn, df, table):
        """
        Here we are going save the dataframe on disk as
        a csv file, load the csv file
        and use copy_from() to copy it to the table
        """
        # save dataframe to an in memory buffer
        buffer = StringIO()
        df.to_csv(buffer, index_label='id', header=False)
        buffer.seek(0)
        cursor = conn.cursor()
        try:
            cursor.copy_from(buffer, table, sep=",")
            conn.commit()
        except (Exception, psycopg2.DatabaseError) as error:
            print("Error: %s" % error)
            conn.rollback()
            cursor.close()
            return 1
        print("copy_from_file() done")
        cursor.close()


    copy_from_stringio(conn, df_qgis, "develop.defaut")

    executionTime = (time.time() - startTime)
    print('Execution final time in seconds: ' + str(executionTime))


我会尽力完成翻译,以下是您需要的内容:

我要针对的是表格本身:

CREATE TABLE IF NOT EXISTS public.defaut
(
    id integer GENERATED ALWAYS AS IDENTITY,
    voie varchar NOT NULL,
    direction varchar NULL,
    date_mesure varchar NOT NULL,
    dfo_id int4 NOT NULL,
    pk_ref varchar NOT NULL,
    type_defaut varchar NOT NULL,
    niveau_ref varchar NOT NULL,
    val_ref float8 NOT NULL,
    longueur float8 NULL,
    pk_debut float8 NOT NULL,
    pk_fin float8 NOT NULL,
    pk float8 NOT NULL,
    pk_original float8 NULL,
    secteur varchar NULL,
    date_sect2 varchar NULL
);
1个回答

0
根据Cloud SQL的最佳实践,为了加快导入速度(适用于小型实例),你可以临时提升实例的级别以改善大型数据集的导入性能。
在这里,你可以找到PostgreSQL实例的样机类型。
如果这些方法都没有帮助,我建议您联系GCP技术支持,使用他们的内部工具检查您的实例,看看是否有任何资源被耗尽。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接