我需要以编程方式将数千万条记录插入Postgres数据库中。目前,我正在单个查询中执行数千个插入语句。
是否有更好的方法来完成这项任务,有我不知道的批量插入语句吗?
我需要以编程方式将数千万条记录插入Postgres数据库中。目前,我正在单个查询中执行数千个插入语句。
是否有更好的方法来完成这项任务,有我不知道的批量插入语句吗?
除了使用COPY之外,还有一种Postgres支持的多行值语法的替代方式。来自文档:
INSERT INTO films (code, title, did, date_prod, kind) VALUES
('B6717', 'Tampopo', 110, '1985-02-10', 'Comedy'),
('HG120', 'The Dinner Game', 140, DEFAULT, 'Comedy');
上面的代码插入了两行,但你可以任意扩展它,直到达到准备语句令牌的最大数量(可能是$999,但我不能100%确定)。有时候无法使用COPY命令,这是一个值得推荐的替代方案。
加快速度的一种方法是在事务中显式执行多次插入或复制操作(比如1000次)。Postgres的默认行为是在每个语句之后提交,所以通过批量提交,可以避免一些开销。正如Daniel回答中的指南所述,您可能需要禁用自动提交以使其工作。还要注意底部的评论建议将wal_buffers的大小增加到16 MB也可能有所帮助。
UNNEST
函数可与多行VALUES语法一起使用。我认为这种方法比使用COPY
慢,但在使用psycopg和Python时很有用(将Python list
传递给cursor.execute
变为pg ARRAY
):
INSERT INTO tablename (fieldname1, fieldname2, fieldname3)
VALUES (
UNNEST(ARRAY[1, 2, 3]),
UNNEST(ARRAY[100, 200, 300]),
UNNEST(ARRAY['a', 'b', 'c'])
);
不使用 VALUES
,而是使用带有额外存在性检查的子选择。
INSERT INTO tablename (fieldname1, fieldname2, fieldname3)
SELECT * FROM (
SELECT UNNEST(ARRAY[1, 2, 3]),
UNNEST(ARRAY[100, 200, 300]),
UNNEST(ARRAY['a', 'b', 'c'])
) AS temptable
WHERE NOT EXISTS (
SELECT 1 FROM tablename tt
WHERE tt.fieldname1=temptable.fieldname1
);
使用相同的语法进行批量更新:
UPDATE tablename
SET fieldname1=temptable.data
FROM (
SELECT UNNEST(ARRAY[1,2]) AS id,
UNNEST(ARRAY['a', 'b']) AS data
) AS temptable
WHERE tablename.id=temptable.id;
"大数据"一词与"大量数据"有关,因此使用原始原始数据而无需将其转换为SQL是很自然的。用于"批量插入"的典型原始数据文件格式为CSV和JSON。
在ETL应用程序和摄取过程中,我们需要在插入数据之前更改数据。临时表会消耗(大量)磁盘空间,并且这不是最快的方法。PostgreSQL foreign-data wrapper(FDW)是最佳选择。
CSV示例。 假设在SQL上有tablename(x,y,z)
,以及一个类似的CSV文件
fieldname1,fieldname2,fieldname3
etc,etc,etc
... million lines ...
您可以使用传统的SQL COPY
将数据(原始数据)加载到 tmp_tablename
,然后将过滤后的数据插入 tablename
...但是,为了避免磁盘消耗,最好直接进行摄取。
INSERT INTO tablename (x, y, z)
SELECT f1(fieldname1), f2(fieldname2), f3(fieldname3) -- the transforms
FROM tmp_tablename_fdw
-- WHERE condictions
;
您需要为FDW准备数据库,而不是静态的tmp_tablename_fdw
,您可以使用生成它的函数:
CREATE EXTENSION file_fdw;
CREATE SERVER import FOREIGN DATA WRAPPER file_fdw;
CREATE FOREIGN TABLE tmp_tablename_fdw(
...
) SERVER import OPTIONS ( filename '/tmp/pg_io/file.csv', format 'csv');
JSON示例。一组包含myRawData1.json
和Ranger_Policies2.json
两个文件的数据可以进行摄取:
INSERT INTO tablename (fname, metadata, content)
SELECT fname, meta, j -- do any data transformation here
FROM jsonb_read_files('myRawData%.json')
-- WHERE any_condiction_here
;
函数jsonb_read_files()读取由掩码定义的文件夹中的所有文件:
CREATE or replace FUNCTION jsonb_read_files(
p_flike text, p_fpath text DEFAULT '/tmp/pg_io/'
) RETURNS TABLE (fid int, fname text, fmeta jsonb, j jsonb) AS $f$
WITH t AS (
SELECT (row_number() OVER ())::int id,
f AS fname,
p_fpath ||'/'|| f AS f
FROM pg_ls_dir(p_fpath) t(f)
WHERE f LIKE p_flike
) SELECT id, fname,
to_jsonb( pg_stat_file(f) ) || jsonb_build_object('fpath', p_fpath),
pg_read_file(f)::jsonb
FROM t
$f$ LANGUAGE SQL IMMUTABLE;
在“文件摄取”中(主要是在大数据领域),最常用的方法是将原始文件保留在gzip格式上,并使用流式算法进行传输,这样可以快速运行且不会消耗Unix管道中的磁盘空间:
gunzip remote_or_local_file.csv.gz | convert_to_sql | psql
所以理想情况下(未来)是 服务器选项,用于格式 .csv.gz
。
注意:根据 @CharlieClark 的评论,目前(2022年)没有什么可做的,最好的替代方案似乎是 pgloader
STDIN:
gunzip -c file.csv.gz | pgloader --type csv ... - pgsql:///target?foo
head -n 1000 file.csv > file_test.csv
进行测试,以通过FDW检查前1000行,问题可能是格式不正确的CSV。 PS:要修复格式不正确的CSV,可以使用csvformat。 - Peter Krauss这主要取决于数据库中的其他活动。类似此操作会冻结其他会话的整个数据库。另一个考虑因素是数据模型及其约束,触发器等是否存在。
我的第一步始终是:创建一个与目标表类似的(临时)表结构(create table tmp AS select * from target where 1=0
),然后将文件读入到临时表中。
然后我会检查可以检查的内容:重复项、已存在于目标表中的键等。
接下来我只需执行 do insert into target select * from tmp
或类似操作。
如果这失败或耗时太长,则放弃它并考虑其他方法(暂时删除索引/约束等)。
pgbulkinsert
的Java库,由Bytefish开发。我和我的团队能够在15秒内批量插入100万条记录。当然,我们还执行了一些其他操作,比如从Minio上的文件中读取1M+的记录,在1M+的记录上进行一些处理,如果有重复记录,则筛选掉记录,最后将1M条记录插入到Postgres数据库中。所有这些过程都在15秒内完成。我不记得确切的DB操作花费了多少时间,但我认为它大约少于5秒。请从https://www.bytefish.de/blog/pgbulkinsert_bulkprocessor.html获取更多详细信息。