如何使用SqlAlchemy执行upsert操作?

135

如果数据库中不存在一条记录,我希望它被创建,如果该记录已存在(主键已存在),则将其字段更新为当前状态。这通常称为upsert

下面这段不完整的代码片段可以实现此功能,但似乎有些笨重(特别是如果有更多列的情况下)。有没有更好/更佳的方法?

Base = declarative_base()
class Template(Base):
    __tablename__ = 'templates'
    id = Column(Integer, primary_key = True)
    name = Column(String(80), unique = True, index = True)
    template = Column(String(80), unique = True)
    description = Column(String(200))
    def __init__(self, Name, Template, Desc):
        self.name = Name
        self.template = Template
        self.description = Desc

def UpsertDefaultTemplate():
    sess = Session()
    desired_default = Template("default", "AABBCC", "This is the default template")
    try:
        q = sess.query(Template).filter_by(name = desiredDefault.name)
        existing_default = q.one()
    except sqlalchemy.orm.exc.NoResultFound:
        #default does not exist yet, so add it...
        sess.add(desired_default)
    else:
        #default already exists.  Make sure the values are what we want...
        assert isinstance(existing_default, Template)
        existing_default.name = desired_default.name
        existing_default.template = desired_default.template
        existing_default.description = desired_default.description
    sess.flush()

有没有更好或更简洁的方法来完成这个任务?类似下面这样的东西就很好:

sess.upsert_this(desired_default, unique_key = "name")

虽然unique_key参数显然是不必要的(ORM应该能够轻松地找出这一点),但我添加了它,因为SQLAlchemy倾向于仅使用主键。例如:我一直在研究是否可以使用Session.merge来实现这个目的,但这只适用于主键,在这种情况下,它是一个自增的id,对于此目的并不是非常有用。

一个简单的用例是当启动服务器应用程序时可能已经升级了其默认预期数据。即:对于此upsert,不存在并发问题。


3
如果name字段是唯一的,为什么不能将其设为主键(在这种情况下合并会起作用)?为什么需要一个单独的主键? - abbot
15
@abbot:我不想卷入关于 ID 字段的争论,但是……简单回答一下就是“外键”。更详细点说,虽然名称确实是唯一必需的主键,但存在两个问题。1)当一个模板记录被另一个表中的 5000 万条记录引用时,把该外键作为字符串字段是不可取的。索引整数更好,因此看起来没有意义的 id 列就有了用武之地。2)进一步说,如果使用字符串作为 FK,那么在名称更改时需要更新两个位置,这很麻烦且容易出现关系失效的问题。而 id 永远不会改变。 - Russ
你可以尝试一下新的(beta版)Python upsert库,它兼容psycopg2、sqlite3和MySQLdb。 - Seamus Abshere
11个回答

0

我使用这段代码进行upsert操作,在使用之前,您应该在数据库表中添加主键。

from sqlalchemy import create_engine
from sqlalchemy import MetaData, Table
from sqlalchemy.inspection import inspect
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.dialects.postgresql import insert

def upsert(df, engine, table_name, schema=None, chunk_size = 1000):

    metadata = MetaData(schema=schema)
    metadata.bind = engine

    table = Table(table_name, metadata, schema=schema, autoload=True)
    
   # olny use common columns between df and table.
    table_columns = {column.name for column in table.columns}
    df_columns = set(df.columns)
    intersection_columns = table_columns.intersection(df_columns)
    
    df1 = df[intersection_columns] 
    records  = df1.to_dict('records')

    # get list of fields making up primary key
    primary_keys = [key.name for key in inspect(table).primary_key]
    

    with engine.connect() as conn:
        chunks = [records[i:i + chunk_size] for i in range(0, len(records), chunk_size)]
        for chunk in chunks:
            stmt = insert(table).values(chunk)
            update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}
            s = stmt.on_conflict_do_update(
                index_elements= primary_keys,
                set_=update_dict)
            conn.execute(s)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接