如何在Postgres中进行批量插入的最快方法?

359

我需要以编程方式将数千万条记录插入Postgres数据库中。目前,我正在单个查询中执行数千个插入语句。

是否有更好的方法来完成这项任务,有我不知道的批量插入语句吗?

12个回答

299

PostgreSQL有一份关于如何最好地初始化数据库的指南,他们建议使用COPY命令来批量加载行。该指南还提供了一些其他好的提示,如在加载数据之前删除索引和外键(然后再添加回来)以加速过程。


51
我在 https://dev59.com/7Wct5IYBdhLWcg3wSbfY 上写了更详细的内容来阐述。 - Craig Ringer
38
哇,"稍微详细一点"确实是我这周见过的最轻描淡写的说法了 ;) - culix
尝试安装包NpgsqlBulkCopy。 - Elyor
1
由于索引还用于数据库记录的物理布局,因此不确定在任何数据库中删除索引是否是一个好主意。 - Farjad
但是您推荐的,内存中没有任何东西!!!如果您的批处理大小可以是一个较小的数字,那么它的工作效果非常糟糕:(我尝试使用 npgsql CopyIn 类,因为它就像在 PG 查询语句中格式化映射 CSV。您可以尝试使用大型表格吗? - Elyor

165

除了使用COPY之外,还有一种Postgres支持的多行值语法的替代方式。来自文档

INSERT INTO films (code, title, did, date_prod, kind) VALUES
    ('B6717', 'Tampopo', 110, '1985-02-10', 'Comedy'),
    ('HG120', 'The Dinner Game', 140, DEFAULT, 'Comedy');

上面的代码插入了两行,但你可以任意扩展它,直到达到准备语句令牌的最大数量(可能是$999,但我不能100%确定)。有时候无法使用COPY命令,这是一个值得推荐的替代方案。


18
你知道这种方法的性能如何与COPY相比吗? - Grant Humphries
如果您遇到权限问题,请在尝试此操作之前使用COPY ... FROM STDIN。 - Andrew Scott Evans
1
如果您正在使用行级安全,这已经是您所能做的最好的了。自版本12起,“COPY FROM不支持带有行级安全的表”。 - Eloff
3
复制比扩展插入快得多。 - hipertracker
4
这里的性能对我来说非常完美。370K行数据在3.291秒内完成。 - Sam Autrey
显示剩余2条评论

29

加快速度的一种方法是在事务中显式执行多次插入或复制操作(比如1000次)。Postgres的默认行为是在每个语句之后提交,所以通过批量提交,可以避免一些开销。正如Daniel回答中的指南所述,您可能需要禁用自动提交以使其工作。还要注意底部的评论建议将wal_buffers的大小增加到16 MB也可能有所帮助。


1
值得一提的是,您可以在同一事务中添加许多插入/复制操作,其限制可能比您尝试的任何操作都要高得多。您可以在同一事务中添加数百万行数据而不会遇到问题。 - Sumeet Jain
@SumeetJain 是的,我只是在谈论每个事务中复制/插入数量的速度的'sweet spot'。 - Dana the Sane
这会在事务运行时锁定表吗? - Lambda Fairy

23

UNNEST函数可与多行VALUES语法一起使用。我认为这种方法比使用COPY慢,但在使用psycopg和Python时很有用(将Python list传递给cursor.execute变为pg ARRAY):

INSERT INTO tablename (fieldname1, fieldname2, fieldname3)
VALUES (
    UNNEST(ARRAY[1, 2, 3]), 
    UNNEST(ARRAY[100, 200, 300]), 
    UNNEST(ARRAY['a', 'b', 'c'])
);

不使用 VALUES ,而是使用带有额外存在性检查的子选择。

INSERT INTO tablename (fieldname1, fieldname2, fieldname3)
SELECT * FROM (
    SELECT UNNEST(ARRAY[1, 2, 3]), 
           UNNEST(ARRAY[100, 200, 300]), 
           UNNEST(ARRAY['a', 'b', 'c'])
) AS temptable
WHERE NOT EXISTS (
    SELECT 1 FROM tablename tt
    WHERE tt.fieldname1=temptable.fieldname1
);

使用相同的语法进行批量更新:

UPDATE tablename
SET fieldname1=temptable.data
FROM (
    SELECT UNNEST(ARRAY[1,2]) AS id,
           UNNEST(ARRAY['a', 'b']) AS data
) AS temptable
WHERE tablename.id=temptable.id;

18

外部文件是最好的典型大数据

"大数据"一词与"大量数据"有关,因此使用原始原始数据而无需将其转换为SQL是很自然的。用于"批量插入"的典型原始数据文件格式为CSVJSON

带有某些转换的批量插入

ETL应用程序和摄取过程中,我们需要在插入数据之前更改数据。临时表会消耗(大量)磁盘空间,并且这不是最快的方法。PostgreSQL foreign-data wrapper(FDW)是最佳选择。

CSV示例。 假设在SQL上有tablename(x,y,z),以及一个类似的CSV文件

fieldname1,fieldname2,fieldname3
etc,etc,etc
... million lines ...

您可以使用传统的SQL COPY 将数据(原始数据)加载到 tmp_tablename,然后将过滤后的数据插入 tablename...但是,为了避免磁盘消耗,最好直接进行摄取。

INSERT INTO tablename (x, y, z)
  SELECT f1(fieldname1), f2(fieldname2), f3(fieldname3) -- the transforms 
  FROM tmp_tablename_fdw
  -- WHERE condictions
;

您需要为FDW准备数据库,而不是静态的tmp_tablename_fdw,您可以使用生成它的函数

CREATE EXTENSION file_fdw;
CREATE SERVER import FOREIGN DATA WRAPPER file_fdw;
CREATE FOREIGN TABLE tmp_tablename_fdw(
  ...
) SERVER import OPTIONS ( filename '/tmp/pg_io/file.csv', format 'csv');

JSON示例。一组包含myRawData1.jsonRanger_Policies2.json两个文件的数据可以进行摄取:

INSERT INTO tablename (fname, metadata, content)
 SELECT fname, meta, j  -- do any data transformation here
 FROM jsonb_read_files('myRawData%.json')
 -- WHERE any_condiction_here
;

函数jsonb_read_files()读取由掩码定义的文件夹中的所有文件:

CREATE or replace FUNCTION jsonb_read_files(
  p_flike text, p_fpath text DEFAULT '/tmp/pg_io/'
) RETURNS TABLE (fid int, fname text, fmeta jsonb, j jsonb) AS $f$
  WITH t AS (
     SELECT (row_number() OVER ())::int id, 
           f AS fname,
           p_fpath ||'/'|| f AS f
     FROM pg_ls_dir(p_fpath) t(f)
     WHERE f LIKE p_flike
  ) SELECT id, fname,
         to_jsonb( pg_stat_file(f) ) || jsonb_build_object('fpath', p_fpath),
         pg_read_file(f)::jsonb
    FROM t
$f$  LANGUAGE SQL IMMUTABLE;

缺乏gzip流式传输

在“文件摄取”中(主要是在大数据领域),最常用的方法是将原始文件保留在gzip格式上,并使用流式算法进行传输,这样可以快速运行且不会消耗Unix管道中的磁盘空间:

 gunzip remote_or_local_file.csv.gz | convert_to_sql | psql 

所以理想情况下(未来)是 服务器选项,用于格式 .csv.gz

注意:根据 @CharlieClark 的评论,目前(2022年)没有什么可做的,最好的替代方案似乎是 pgloader STDIN

  gunzip -c file.csv.gz | pgloader --type csv ... - pgsql:///target?foo

当我尝试使用FDW处理一个非常大的(> 10 GB)CSV文件时,我遇到了Postges的内存问题(以及处理MySQL导出中的一些奇怪错误),并且找不到任何解决方法。 - Charlie Clark
@CharlieClark,PostgreSQL版本是多少?您可以在此处或PostgreSQL选项中报告错误。另一种解决方案是拆分...例如Unix head -n 1000 file.csv > file_test.csv进行测试,以通过FDW检查前1000行,问题可能是格式不正确的CSV。 PS:要修复格式不正确的CSV,可以使用csvformat - Peter Krauss
我在不同版本的Postgres中遇到了错误,但最近的是13。我没有追求其他选项,因为pgloader覆盖了大部分选项,但我对内存问题感到惊讶。 - Charlie Clark

12
您可以使用 COPY table TO ... WITH BINARY 命令,这个命令比文本和 CSV 格式的命令 "稍微快一点"。只有在需要插入数百万行数据且对二进制数据操作熟练时才应该使用此项技术。
以下是一个在 Python 中使用 psycopg2 进行二进制输入的示例:recipe

10

这主要取决于数据库中的其他活动。类似此操作会冻结其他会话的整个数据库。另一个考虑因素是数据模型及其约束,触发器等是否存在。

我的第一步始终是:创建一个与目标表类似的(临时)表结构(create table tmp AS select * from target where 1=0),然后将文件读入到临时表中。

然后我会检查可以检查的内容:重复项、已存在于目标表中的键等。

接下来我只需执行 do insert into target select * from tmp 或类似操作。

如果这失败或耗时太长,则放弃它并考虑其他方法(暂时删除索引/约束等)。


5

我刚刚遇到了这个问题,推荐使用csvsql发行版)进行Postgres的批量导入。要执行批量插入,您只需createdb,然后使用csvsql,它会连接到您的数据库并为整个CSV文件夹创建单独的表。

$ createdb test 
$ csvsql --db postgresql:///test --insert examples/*.csv

1
对于csvsql,为了清除源CSV中可能存在的任何格式错误,最好遵循这些说明,更多文档请参见此处 - sal

4

2
也许我已经来晚了。但是,有一个叫做pgbulkinsert的Java库,由Bytefish开发。我和我的团队能够在15秒内批量插入100万条记录。当然,我们还执行了一些其他操作,比如从Minio上的文件中读取1M+的记录,在1M+的记录上进行一些处理,如果有重复记录,则筛选掉记录,最后将1M条记录插入到Postgres数据库中。所有这些过程都在15秒内完成。我不记得确切的DB操作花费了多少时间,但我认为它大约少于5秒。请从https://www.bytefish.de/blog/pgbulkinsert_bulkprocessor.html获取更多详细信息。

1
我能够在3.7秒内插入100万条记录。 - Sajidur Rahman

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接