Postgres数据库使用psycopg2批量更新行

9
我们需要对Postgres DB中的许多行进行批量更新,并希望使用以下SQL语法。我们如何使用psycopg2实现这一点?
UPDATE table_to_be_updated
SET msg = update_payload.msg
FROM (VALUES %(update_payload)s) AS update_payload(id, msg)
WHERE table_to_be_updated.id = update_payload.id
RETURNING *

尝试 1 - 传递值

我们需要将嵌套可迭代格式传递给 psycopg2 查询。对于 update_payload,我尝试了传递一个列表的列表,元组的列表和元组的元组。但是这些尝试都以各种错误失败了。

尝试 2 - 编写带有 __conform__ 的自定义类

我尝试编写一个自定义类来处理这些操作,该类将返回

(VALUES (row1_col1, row1_col2), (row2_col1, row2_col2), (...))

我按照这里的指示进行编码,但很明显我做错了什么。例如,在这种方法中,我将不得不处理表内所有值的引用,这将是繁琐且容易出错的。

class ValuesTable(list):
    def __init__(self, *args, **kwargs):
        super(ValuesTable, self).__init__(*args, **kwargs)

    def __repr__(self):
        data_in_sql = ""
        for row in self:
            str_values = ", ".join([str(value) for value in row])
            data_in_sql += "({})".format(str_values)
        return "(VALUES {})".format(data_in_sql)

    def __conform__(self, proto):
        return self.__repr__()

    def getquoted(self):
        return self.__repr__()

    def __str__(self):
        return self.__repr__()

编辑: 如果有比我原问题中的语法更快/更清洁地进行批量更新的方法,则请告诉我!


你在第一种方法中遇到了什么错误?你是如何执行 SQL 的?如果有大量的值,将它们插入到临时表中并从那里更新可能更容易。 - Jeremy
我不明白你想做什么,但从“psycopg2”文档中可以看出,你必须将参数作为序列传递。使用['tbl1','msg1','tbl2','msg2']作为命名参数“vars”执行你的查询 (VALUES (%s, %s), (%s, %s), ...) 是否已经帮助你了? - Yannic Hamann
我明白了。我已经更新了答案,使它更加清晰,表达出我不太在意按照我撰写的精确方式来完成它。重要的是以一个干净/快速的方式进行批量更新。 - pir
2个回答

23

要求:

  • Postgres表,包括字段id和msg(可能还有其他字段)
  • Python数据包含msg的新值
  • 应使用psycopg2更新Postgres表

示例表格

CREATE TABLE einstein(
   id CHAR(5) PRIMARY KEY,
   msg VARCHAR(1024) NOT NULL
);

测试数据

INSERT INTO einstein VALUES ('a', 'empty');
INSERT INTO einstein VALUES ('b', 'empty');
INSERT INTO einstein VALUES ('c', 'empty');

Python程序

这是一个假设性的、自包含的示例程序,其中引用了一位著名物理学家的话。

import sys
import psycopg2
from psycopg2.extras import execute_values


def print_table(con):
    cur = con.cursor()
    cur.execute("SELECT * FROM einstein")
    rows = cur.fetchall()
    for row in rows:
        print(f"{row[0]} {row[1]}")


def update(con, einstein_quotes):
    cur = con.cursor()
    execute_values(cur, """UPDATE einstein 
                           SET msg = update_payload.msg 
                           FROM (VALUES %s) AS update_payload (id, msg) 
                           WHERE einstein.id = update_payload.id""", einstein_quotes)
    con.commit()


def main():
    con = None
    einstein_quotes = [("a", "Few are those who see with their own eyes and feel with their own hearts."),
                       ("b", "I have no special talent. I am only passionately curious."),
                       ("c", "Life is like riding a bicycle. To keep your balance you must keep moving.")]

    try:
        con = psycopg2.connect("dbname='stephan' user='stephan' host='localhost' password=''")
        print_table(con)
        update(con, einstein_quotes)
        print("rows updated:")
        print_table(con)

    except psycopg2.DatabaseError as e:

        print(f'Error {e}')
        sys.exit(1)

    finally:

        if con:
            con.close()


if __name__ == '__main__':
    main()

预处理语句的替代方案

import sys
import psycopg2
from psycopg2.extras import execute_batch


def print_table(con):
    cur = con.cursor()
    cur.execute("SELECT * FROM einstein")
    rows = cur.fetchall()
    for row in rows:
        print(f"{row[0]} {row[1]}")


def update(con, einstein_quotes, page_size):
    cur = con.cursor()
    cur.execute("PREPARE updateStmt AS UPDATE einstein SET msg=$1 WHERE id=$2")
    execute_batch(cur, "EXECUTE updateStmt (%(msg)s, %(id)s)", einstein_quotes, page_size=page_size)
    cur.execute("DEALLOCATE updateStmt")
    con.commit()


def main():
    con = None
    einstein_quotes = ({"id": "a", "msg": "Few are those who see with their own eyes and feel with their own hearts."},
                       {"id": "b", "msg": "I have no special talent. I am only passionately curious."},
                       {"id": "c", "msg": "Life is like riding a bicycle. To keep your balance you must keep moving."})

    try:
        con = psycopg2.connect("dbname='stephan' user='stephan' host='localhost' password=''")
        print_table(con)
        update(con, einstein_quotes, 100)  #choose some meaningful page_size here
        print("rows updated:")
        print_table(con)

    except psycopg2.DatabaseError as e:

        print(f'Error {e}')
        sys.exit(1)

    finally:

        if con:
            con.close()


if __name__ == '__main__':
    main()

输出

上述程序将在调试控制台中输出以下内容:

a     empty
b     empty
c     empty
rows updated:
a     Few are those who see with their own eyes and feel with their own hearts.
b     I have no special talent. I am only passionately curious.
c     Life is like riding a bicycle. To keep your balance you must keep moving.

2
也许值得指出的是,“executemany”不比在循环中调用“execute”更快。它不能产生OP所要求的SQL语法。仍然为您的时间和对令人钦佩的人的美好引用点赞。 - Yannic Hamann
1
@YannicHamann 谢谢。确实是非常好的提示,我已经使用 execute_batch 更新了答案,它最多将 page_size 个更新语句组合成一个批处理,从而减少了服务器往返次数。我的重点在于问题的值传递方面。通过使用预处理语句,可以使其运行得更快。所示期望 SQL 语句的 FROM 子句将在表上工作,但在这种情况下,数据是通过使用可迭代格式在内存中传递的。或者我在这里漏掉了什么? - Stephan Schlecht
1
我认为你提供的答案非常易读,与 SQL OP 想要的代码相比,我更愿意在我正在使用的代码库中找到你的代码。然而,可能有理由生成他所要求的确切 SQL,但我认为这已经通过我的评论在他的帖子下得到解决了。 - Yannic Hamann
1
我已经更新了答案,使用了预处理语句,以便数据库能够缓存查询计划以实现更快的执行。 - Stephan Schlecht
我觉得很奇怪的是,执行许多更新一行的语句比单个更新多行的语句更快。我找不到任何相关信息。你有什么可以分享的吗? - pir
显示剩余3条评论

5
简短回答!使用execute_values(curs, sql, args),请参见文档
对于那些寻求简短明了的答案的人。 批量更新用户的示例代码;
from psycopg2.extras import execute_values

sql = """
    update users u
    set
        name = t.name,
        phone_number = t.phone_number
    from (values %s) as t(id, name, phone_number)
    where u.id = t.id;
"""

rows_to_update = [
    (2, "New name 1", '+923002954332'),
    (5, "New name 2", '+923002954332'),
]
curs = conn.cursor()  # Assuming you already got the connection object
execute_values(curs, sql, rows_to_update)

如果您正在使用uuid作为主键,并且在psycopg2中未注册uuid数据类型(将uuid保留为Python字符串),则始终可以使用此条件u.id = t.id :: uuid

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接