Psycopg2:使用一个查询插入多行

237

我需要使用一条查询语句插入多行数据(行数不固定),因此我需要执行以下类似的查询语句:

INSERT INTO t (a, b) VALUES (1, 2), (3, 4), (5, 6);

我唯一知道的方法是:
args = [(1,2), (3,4), (5,6)]
args_str = ','.join(cursor.mogrify("%s", (x, )) for x in args)
cursor.execute("INSERT INTO t (a, b) VALUES "+args_str)

但是我希望有一些更简单的方法。

20个回答

298

我编写了一个程序,可以将多行数据插入到位于另一个城市的服务器中。

我发现使用这种方法比executemany快约10倍。在我的情况下,tup是包含大约2000行的元组。使用这种方法仅需要大约10秒钟:

args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str) 

使用此方法时需要 2 分钟:

cur.executemany("INSERT INTO table VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s)", tup)

21
两年后仍然非常相关。今天的经验表明,随着要推送的行数增加,最好使用“执行”策略。我因此看到了大约100倍的速度提升! - Rob Watts
6
也许 executemany 在每次插入后都会执行提交。如果您改为将整个操作包装在一个事务中,也许可以加快速度? - Richard
5
我已证实这个改进。据我所了解,psycopg2的“executemany”并没有进行任何优化,只是循环执行多次“execute”语句。使用这种方法,一个远程服务器的700行插入操作由原来的60秒缩短到少于2秒。 - Nelson
10
也许我有点多疑,但是把查询语句与 + 连接起来似乎会导致 SQL 注入的风险,我觉得 @Clodoaldo Neto 的 execute_values() 解决方案更安全。 - Will Munn
63
如果有人遇到以下错误:[TypeError: sequence item 0: expected str instance, bytes found],请运行以下命令[args_str = ','.join(cur.mogrify("(%s,%s)", x).decode("utf-8") for x in tup)]。 - mrt
显示剩余9条评论

240

Psycopg 2.7 中的新 execute_values方法

data = [(1,'x'), (2,'y')]
insert_query = 'insert into t (a, b) values %s'
psycopg2.extras.execute_values (
    cursor, insert_query, data, template=None, page_size=100
)

在 Psycopg 2.6 中的 Pythonic 方式:

data = [(1,'x'), (2,'y')]
records_list_template = ','.join(['%s'] * len(data))
insert_query = 'insert into t (a, b) values {}'.format(records_list_template)
cursor.execute(insert_query, data)

说明:如果要插入的数据以元组列表的形式给出,如下所示:

data = [(1,'x'), (2,'y')]

那么它已经是所需格式的精确形式了

  1. insert子句的values语法期望一个记录列表,如下所示:

    insert into t (a, b) values (1, 'x'),(2, 'y')

  2. Psycopg将Python tuple适配为Postgresql record

唯一必要的工作是提供一个记录列表模板,由psycopg填充

# We use the data list to be sure of the template length
records_list_template = ','.join(['%s'] * len(data))

将其放入insert查询中。
insert_query = 'insert into t (a, b) values {}'.format(records_list_template)

打印insert_query输出

insert into t (a, b) values %s,%s

现在来说一下通常的Psycopg参数替换

cursor.execute(insert_query, data)

或者只是测试将发送到服务器的内容

print (cursor.mogrify(insert_query, data).decode('utf8'))

输出:

insert into t (a, b) values (1, 'x'),(2, 'y')

2
这种方法的性能与cur.copy_from相比如何? - Michael Goldshteyn
3
这里有一个基准测试的概要信息。在我的电脑上,使用10M条记录,copy_from 比其他方法快大约6.5倍。 - Joseph Sheedy
3
使用execute_values,我能够将我的系统性能从每分钟1千条记录提高到每分钟128千条记录。 - Conrad.Dean
2
@Phillipp,每个执行语句都是这样的,除非你处于自动提交模式。 - Chris
改进了插入30,000条记录的速度,从18分钟缩短到2分钟!!! - undefined
显示剩余3条评论

114

使用 psycopg2 2.7 更新:

传统的 executemany() 函数比 @ant32 实现的 "folded" 版本慢了约60倍,具体说明可以参见此线程:https://www.postgresql.org/message-id/20170130215151.GA7081%40deb76.aryehleib.com

在版本 2.7 中,将此实现加入到 psycopg2 中并命名为 execute_values():

from psycopg2.extras import execute_values
execute_values(cur,
    "INSERT INTO test (id, v1, v2) VALUES %s",
    [(1, 2, 3), (4, 5, 6), (7, 8, 9)])

之前的答案:

要插入多行数据,使用multirow VALUES语法加上execute()比使用psycopg2的executemany()大约快10倍。实际上,executemany()只运行许多单独的INSERT语句。

@ant32的代码在Python 2中完美运行。但在Python 3中,cursor.mogrify()返回字节,cursor.execute()接受字节或字符串,而','.join()则需要str实例。

因此,在Python 3中,您可能需要修改@ant32的代码,通过添加.decode('utf-8')

args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x).decode('utf-8') for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str)
用字节(使用b''b"")也可以:
args_bytes = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute(b"INSERT INTO table VALUES " + args_bytes) 

谢谢,更新后的答案很好用。请不要忘记使用conn.commit()来保存更改。 - RicHincapie
1
execute_values()比@ant32更快吗? - étale-cohomology
execute_many可能不再是它曾经的样子了:从3.1开始,它使用pycopg的管道模式将命令批处理到一个Postgresql单个发送/接收序列中。 - Erik Knowles

38

cursor.copy_from 是我迄今为止找到的最快速的批量插入解决方案。 这里是一个 gist,其中包含一个名为 IteratorFile 的类,它允许将生成字符串的迭代器读取为文件。我们可以使用生成器表达式将每个输入记录转换为字符串。因此,解决方案如下:

args = [(1,2), (3,4), (5,6)]
f = IteratorFile(("{}\t{}".format(x[0], x[1]) for x in args))
cursor.copy_from(f, 'table_name', columns=('a', 'b'))

对于这种微小的args大小,速度差别不大,但是在处理成千上万行数据时可以看到很大的速度提升。相比构建一个巨大的查询字符串,这种方法也更加节省内存。迭代器一次只会在内存中保存一个输入记录,而使用查询字符串构建语句时可能会在Python进程或Postgres中耗尽内存。

4
这里有一个基准测试,将copy_from/IteratorFile与查询构建器解决方案进行比较。在我的计算机上,使用10M条记录,copy_from的扩展性大约比查询构建器快6.5倍。 - Joseph Sheedy
4
你需要费心处理转义字符串、时间戳等吗? - CpILL
1
是的,您需要确保拥有格式良好的TSV记录。 - Joseph Sheedy
嗨,五年后仍然相关!!我是Python和Postgres的新手,正在尝试找到将数据(从XML中提取)加载到Postgres的最有效和最快速的方法。所以我发现了@Joseph Sheedy的答案,并想尝试并测试它。但是我不明白def readline(self):函数应该在哪里或如何使用,因为我在代码中没有找到任何对它的引用?可以有人给一个完整的使用示例吗?请原谅我的天真。谢谢。 - marie20
@marie20,copy_from 隐式调用了 readline。这是文档链接:https://www.psycopg.org/docs/cursor.html#cursor.copy_from - Joseph Sheedy

33

以下是来自Psycopg2教程页面的片段,该页面可以在Postgresql.org(请参见底部)找到:

我想向您展示的最后一个项目是如何使用字典插入多行。 如果您有以下内容:

namedict = ({"first_name":"Joshua", "last_name":"Drake"},
            {"first_name":"Steven", "last_name":"Foo"},
            {"first_name":"David", "last_name":"Bar"})

你可以通过以下方式轻松地在字典中插入所有三行:

cur = conn.cursor()
cur.executemany("""INSERT INTO bar(first_name,last_name) VALUES (%(first_name)s, %(last_name)s)""", namedict)

虽然它没有节省太多代码,但它看起来明显更好一些。


46
这将运行许多个单独的INSERT语句。虽然有用,但与单个多值插入不同。 - Craig Ringer
2
在同一份文件中写道:cur.executemany语句将自动遍历字典并为每一行执行INSERT查询。 - sp1rs

12

安全漏洞

截至2022年11月16日,@Clodoaldo Neto(针对Psycopg 2.6)、@Joseph Sheedy、@J.J、@Bart Jonk、@kevo Njoki、@TKoutny和@Nihal Sharma的答案都存在SQL注入漏洞,不应使用。

迄今为止最快的提案(copy_from)也不应使用,因为很难正确转义数据。当尝试插入像'"\n\\t\n等字符时,这很容易明显。

psycopg2的作者也建议不要使用copy_from

copy_from()和copy_to()只是古老而不完整的方法

最快的方法

最快的方法是cursor.copy_expert,它可以直接从CSV文件中插入数据。

with open("mydata.csv") as f:
    cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", f)

copy_expert是生成CSV文件时最快的方法之一。请参考以下的CSVFile类,该类处理了内存使用限制问题。

import io, csv

class CSVFile(io.TextIOBase):
    # Create a CSV file from rows. Can only be read once.
    def __init__(self, rows, size=8192):
        self.row_iter = iter(rows)
        self.buf = io.StringIO()
        self.available = 0
        self.size = size

    def read(self, n):
        # Buffer new CSV rows until enough data is available
        buf = self.buf
        writer = csv.writer(buf)
        while self.available < n:
            try:
                row_length = writer.writerow(next(self.row_iter))
                self.available += row_length
                self.size = max(self.size, row_length)
            except StopIteration:
                break

        # Read requested amount of data from buffer
        write_pos = buf.tell()
        read_pos = write_pos - self.available
        buf.seek(read_pos)
        data = buf.read(n)
        self.available -= len(data)

        # Shrink buffer if it grew very large
        if read_pos > 2 * self.size:
            remaining = buf.read()
            buf.seek(0)
            buf.write(remaining)
            buf.truncate()
        else:
            buf.seek(write_pos)

        return data

这个类可以像这样使用:

rows = [(1, "a", "b"), (2, "c", "d")]
cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", CSVFile(rows))

如果你的所有数据都可以放进内存,那么你也可以直接生成整个CSV数据,而不需要使用CSVFile类。但是,如果你不知道未来要插入多少数据,最好不要这样做。
f = io.StringIO()
writer = csv.writer(f)
for row in rows:
    writer.writerow(row)
f.seek(0)
cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", f)

基准测试结果

  • 914毫秒 - 多次调用cursor.execute
  • 846毫秒 - cursor.executemany
  • 362毫秒 - psycopg2.extras.execute_batch
  • 346毫秒 - 使用page_size=1000execute_batch
  • 265毫秒 - 使用预处理语句的execute_batch
  • 161毫秒 - psycopg2.extras.execute_values
  • 127毫秒 - 通过字符串拼接使用cursor.execute
  • 39毫秒 - 一次性生成整个CSV文件的copy_expert
  • 32毫秒 - 使用CSVFilecopy_expert

嗨BlueSky。你能告诉我execute_values在SQL注入方面的问题是什么吗? - qyryq

8
在Postgres术语中,所有这些技术都被称为“Extended Inserts”。截至2016年11月24日,它仍然比psychopg2的executemany()和本文列出的所有其他方法(在来到此答案之前我尝试过)要快得多。
以下是一些代码,它不使用cur.mogrify,而且很容易理解:
valueSQL = [ '%s', '%s', '%s', ... ] # as many as you have columns.
sqlrows = []
rowsPerInsert = 3 # more means faster, but with diminishing returns..
for row in getSomeData:
        # row == [1, 'a', 'yolo', ... ]
        sqlrows += row
        if ( len(sqlrows)/len(valueSQL) ) % rowsPerInsert == 0:
                # sqlrows == [ 1, 'a', 'yolo', 2, 'b', 'swag', 3, 'c', 'selfie' ]
                insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*rowsPerInsert)
                cur.execute(insertSQL, sqlrows)
                con.commit()
                sqlrows = []
insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*len(sqlrows))
cur.execute(insertSQL, sqlrows)
con.commit()

但是需要注意的是,如果你能使用copy_from()方法,你应该使用copy_from()方法 ;)


从死亡中恢复,但在最后几行的情况下会发生什么?我假设您实际上会在剩下的最后几行再次运行最终子句,在您有偶数行的情况下是这样吗? - mcpeterson
正确的,抱歉我写示例时可能忘记做了这件事 - 这很愚蠢。不这样做不会给人们带来错误,这让我担心有多少人复制/粘贴解决方案然后继续他们的工作...... 无论如何,非常感谢mcpeterson - 谢谢! - J.J

4
自从这个问题被发布以来,execute_batch已经被添加到psycopg2中。它比execute_values更快。请注意保留HTML标签。

3
请见其他评论。psycopg2的execute_values方法比execute_batch 更快。 - Fierr

4

我之前使用的是ant32所提供的解决方案,已经用了好几年了。然而发现在Python 3中,这种方法会出现错误,因为mogrify返回的是一个字节字符串。

将字符串明确地转换为字节字符串是使代码兼容Python 3的简单解决方案。

args_str = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup) 
cur.execute(b"INSERT INTO table VALUES " + args_str)

更简单的选择是解码 cur.mogrify() - Hemil

4

executemany 接受元组数组。

https://www.postgresqltutorial.com/postgresql-python/insert/

    """ array of tuples """
    vendor_list = [(value1,)]

    """ insert multiple vendors into the vendors table  """
    sql = "INSERT INTO vendors(vendor_name) VALUES(%s)"
    conn = None
    try:
        # read database configuration
        params = config()
        # connect to the PostgreSQL database
        conn = psycopg2.connect(**params)
        # create a new cursor
        cur = conn.cursor()
        # execute the INSERT statement
        cur.executemany(sql,vendor_list)
        # commit the changes to the database
        conn.commit()
        # close communication with the database
        cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接