SQLAlchemy中的PostgreSQL ON CONFLICT。

27

我已经阅读了很多资源(例如12),但我无法在sqlalchemy中实现PostgreSQL的ON CONFLICT IGNORE行为。

我使用了这个被接受的答案作为基础,但它并没有起作用。

SAWarning: Can't validate argument 'append_string'; can't locate any SQLAlchemy dialect named 'append'

我已经尝试将postgresql方言添加到@compile子句中,并重新命名我的对象,但它没有起作用。我还尝试使用str(insert())+ " ON CONFLICT IGNORE",但没有结果(顺便说一下不奇怪)。如何在插入时添加On CONFLICT IGNORE?我喜欢建议的解决方案,因为我可以看到自己不想在每个INSERT上使用IGNORE行为。PS.使用Python 2.7(不介意升级到3.4 / 3.5),最新的SQLAlchemy(1.x)

你应该提供示例代码,因为如果你提到的这个被接受的答案对你有效(你测试过了吗?测试一下!),将UPDATE更改为IGNORE也应该有效,否则你可能有一个拼写错误。但是,如果你提到的代码也不起作用,那么你可能有其他问题。 - omikron
5个回答

24

使用Postgres 9.6.1,sqlalchemy 1.1.4和psycopg2 2.6.2:

  1. 将数据结构转换为字典。从Pandas中可以这样做:

import pandas
from sqlalchemy import MetaData
from sqlalchemy.dialects.postgresql import insert
import psycopg2

# The dictionary should include all the values including index values
insrt_vals = df.to_dict(orient='records')
  • 通过Sqlalchemy连接到数据库。相反,尝试在下面使用psycog2驱动程序和本机的COPY函数,绕过所有的Postgres索引。

  • csv_data = os.path.realpath('test.csv')
    con = psycopg2.connect(database = 'db01', user = 'postgres')
    cur = con.cursor()
    cur.execute("\copy stamm_data from '%s' DELIMITER ';' csv header" % csv_data)
    con.commit()
    
  • 执行

  • results = engine.execute(do_nothing_stmt)
    # Get number of rows inserted
    rowcount = results.rowcount
    

    警告:

    这种方法不能直接处理 NaT

    全部内容

    tst_df = pd.DataFrame({'colA':['a','b','c','a','z', 'q'],
                  'colB': pd.date_range(end=datetime.datetime.now() , periods=6),
                  'colC' : ['a1','b2','c3','a4','z5', 'q6']})
    
    
    insrt_vals = tst_df.to_dict(orient='records')
    engine =      sqlalchemy.create_engine("postgresql://user:password@localhost/postgres")
    connect = engine.connect()
    meta = MetaData(bind=engine)
    meta.reflect(bind=engine)
    table = meta.tables['tstbl']
    insrt_stmnt = insert(table).values(insrt_vals)
    
    do_nothing_stmt  = insrt_stmnt.on_conflict_do_nothing(index_elements=['colA','colB'])
    results = engine.execute(do_nothing_stmt)
    

    使用psycopg2驱动程序和PostgreSQL中的COPY命令,可以在处理较大文件(接近1GB)时更快,因为它会关闭所有表索引,无需执行第二步和第三步。

    csv_data = os.path.realpath('test.csv')
    

    12

    这适用于Postgresql 10.5和Sqlalchemy 1.3.6:

    from sqlalchemy.dialects.postgresql import insert
    
    
    table_info = {
    'tableTime': '',
    'deploymentID': '',
    'tableData': ''
    }
    insert_table = insert(Table).values(table_info)
    insert_table_sql = insert_table.on_conflict_do_nothing(
      index_elements=['tableTime', 'deploymentID']
    )
    db.session.execute(insert_table_sql)
    db.session.commit()
    

    这应该被接受为正确答案。 - undefined

    7

    这适用于Postgresql 9.5:

    from sqlalchemy.ext.compiler import compiles
    from sqlalchemy.sql.expression import Insert
    
    @compiles(Insert)
    def prefix_inserts(insert, compiler, **kw):
        return compiler.visit_insert(insert, **kw) + " ON CONFLICT DO NOTHING"
    

    我使用它来执行bulk_insert_mappings。然而,它并不会使ON CONFLICT DO NOTHING变为可选项。


    注意:我在这里添加了一个使用上下文管理器的版本,以使其成为可选项:https://dev59.com/PFwX5IYBdhLWcg3w2Shr#62305344 - WirthLuce
    如果原始插入语句已经包含'on conflict...'和/或'returning'子句,则此操作将失败。 - Anatoly Alekseev

    4

    这是 Niklas 的答案的扩展。

    基本上,使用线程本地状态和上下文管理器使追加 ON CONFLICT DO NOTHING 变成可选项。不过这仍然是一个很大的 hack。

    它只钩住了特定于 postgres 的语句,没有从文本手动构建 SQL 查询。

    import threading
    from contextlib import contextmanager
    
    from sqlalchemy.ext.compiler import compiles
    from sqlalchemy.sql.expression import Insert
    from sqlalchemy.dialects.postgresql.dml import OnConflictDoNothing
    
    state = threading.local()
    
    @contextmanager
    def on_conflict_do_nothing():
      state.active = True
      yield
      del state.active
    
    @compiles(Insert, 'postgresql')
    def prefix_inserts(insert, compiler, **kw):
      if getattr(state, "active", False):
        insert._post_values_clause = OnConflictDoNothing()
      return compiler.visit_insert(insert, **kw)
    

    如何更改代码,以便优雅地处理现有“on conflict”子句的情况? - Anatoly Alekseev

    1
    "您不需要这个,使用exists条件可以防止插入重复项。
    例如:"
    INSERT INTO table (unique_name) 
    SELECT 'some_string'
    WHERE NOT EXISTS(SELECT 1 FROM table WHERE unique_name = 'some_string')
    

    你也可以。
    INSERT INTO table (unique_name)
    VALUES('some_string')
    ON CONFLICT (unique_name) DO NOTHING
    

    但是,如果您需要在单个查询中插入或更新记录,那么这个例子就适合您:
    INSERT INTO distributors (did, dname)
    VALUES (5, 'Gizmo Transglobal'), (6, 'Associated Computing, Inc')
    ON CONFLICT (did) DO UPDATE SET dname = EXCLUDED.dname;
    

    这是 PostgreSQL 文档的一个示例。

    可以提供示例代码吗?如何使用exists条件在一条语句中进行插入或更新? - Kiran Jonnalagadda
    错过了投票。已修复。但无法将其标记为正确答案,因为我没有提出问题。 - Kiran Jonnalagadda
    11
    我提出了这个问题,但我仍然不理解如何在 sqlalchemy 中运作。 - puredevotion
    你是在问:如何在SQLAlchemy中使用SQL吗?如果是,请阅读这个链接。如果不是,请发布你的代码,因为我不知道你的代码是如何工作的。 - Adam Silenko
    我会使用文本SQL来完成,我猜这样做没问题? - puredevotion

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接