如何使用SQLAlchemy在插入时解决相关外键问题?

10

动机

我有一些数据来自一个接口,我使用pandas DataFrame进行接口处理。 我有一个由SQLAlchemy ORM接口的数据模型。 出于MCVE的缘故,我将数据模型规范化为两个表:

  • channel 存储记录的元数据(小容量,约1k行);
  • record 指向channel的记录(较高容量,每天90k行)。

channel 的目的是避免重复。我想要的是使用SQLAlchemy设置pythonic的数据插入到record 表中,但数据源不知道 channelid 的约束条件。

数据源

这里是数据源的一个样本(我能访问的唯一数据):

import pandas as pd
recs = [
    {'serial': '1618741320', 'source': 1, 'channel': 4, 'timestamp': pd.Timestamp('2019-01-01 08:35:00'), 'value': 12},
    {'serial': '1350397285', 'source': 2, 'channel': 3, 'timestamp': pd.Timestamp('2019-01-01 09:20:00'), 'value': 37},
    {'serial': '814387724', 'source': 2, 'channel': 1, 'timestamp': pd.Timestamp('2019-01-01 12:30:00'), 'value': 581},
    {'serial': '545914014', 'source': 3, 'channel': 0, 'timestamp': pd.Timestamp('2019-01-01 01:45:00'), 'value': 0},
    {'serial': '814387724', 'source': 0, 'channel': 5, 'timestamp': pd.Timestamp('2019-01-01 14:20:00'), 'value': 699}
]
data = pd.DataFrame(recs)

这里是从设置中学习到的存储在 channel 中的元信息样例。

recs = [
    {'channelid': 28, 'serial': '545914014', 'source': 3, 'channel': 0},
    {'channelid': 73, 'serial': '1350397285', 'source': 2, 'channel': 3},
    {'channelid': 239, 'serial': '1618741320', 'source': 1, 'channel': 4},
    {'channelid': 245, 'serial': '814387724', 'source': 0, 'channel': 5},
    {'channelid': 259, 'serial': '814387724', 'source': 2, 'channel': 1}
]
meta= pd.DataFrame(recs)

MCVE

首先,让我们从一个MCVE开始!

我们定义数据模型:

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, Float, String, DateTime
from sqlalchemy import UniqueConstraint, ForeignKey
from sqlalchemy.orm import relationship

Base = declarative_base()
Engine = create_engine("postgresql://postgres:postgres@localhost:5432/postgres")

class Channel(Base):
    __tablename__ = 'channel'
    __table_args__ = (UniqueConstraint('serial', 'source', 'channel'),)
    id = Column(Integer, primary_key=True)
    serial = Column(String, nullable=False)
    source = Column(Integer, nullable=False)
    channel = Column(Integer, nullable=False)

class Record(Base):
    __tablename__ = 'record'
    __table_args__ = (UniqueConstraint('channelid', 'timestamp'),)
    id = Column(Integer, primary_key=True)
    channelid = Column(Integer, ForeignKey('channel.id'), nullable=False)
    timestamp = Column(DateTime, nullable=False)
    value = Column(Float, nullable=False)
    channel = relationship("Channel")

Base.metadata.drop_all(Engine)
Base.metadata.create_all(Engine)

我们将channel表提供的元数据进行更新:

with Engine.connect() as dbcon:
    dbcon.execute(Channel.__table__.insert(), meta.to_dict(orient='records'))

需要解决的问题

现在我们想要轻松地将data插入到record表中,但不幸的是我们缺少来自数据源的channelid(数据源并不知道它)。显然这个调用会失败:

with Engine.connect() as dbcon:
    with dbcon.begin() as dbtrans:
        dbcon.execute(Record.__table__.insert(), data.to_dict(orient='records'))
        dbtrans.commit()

因为:

IntegrityError: (psycopg2.errors.NotNullViolation) null value in column "channelid" violates not-null constraint
DETAIL:  Failing row contains (6, null, 2019-01-01 08:35:00, 12).
 [SQL: 'INSERT INTO record (timestamp, value) VALUES (%(timestamp)s, %(value)s)'] [parameters: ({'timestamp': Timestamp('2019-01-01 08:35:00'), 'value': 12}, {'timestamp': Timestamp('2019-01-01 09:20:00'), 'value': 37}, {'timestamp': Timestamp('2019-01-01 12:30:00'), 'value': 581}, {'timestamp': Timestamp('2019-01-01 01:45:00'), 'value': 0}, {'timestamp': Timestamp('2019-01-01 14:20:00'), 'value': 699})]

我们可以使用pandas处理它:

meta = pd.read_sql("SELECT id AS channelid, serial, source, channel FROM channel;", Engine.connect())
full = data.merge(meta, on=['serial', 'source', 'channel'])

由于与channelid的关联已完成,因此前面的调用将起作用:

   channel      serial  source           timestamp  value  channelid
0        4  1618741320       1 2019-01-01 08:35:00     12        239
1        3  1350397285       2 2019-01-01 09:20:00     37         73
2        1   814387724       2 2019-01-01 12:30:00    581        259
3        0   545914014       3 2019-01-01 01:45:00      0         28
4        5   814387724       0 2019-01-01 14:20:00    699        245

但我认为这不是解决问题的正确方式,主要是因为这会让我使用pandas而不是SQLAlchemy进行绑定。

我也尝试过这种方法,但对于9万条记录的数据集来说效率非常低下:

from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=Engine)
session = Session()
with session.begin_nested() as trans:
    for rec in data.to_dict(orient='records'):
        c = session.query(Channel).filter_by(**{k: rec.pop(k) for k in ['serial', 'source', 'channel']}).first()
        r = Record(channelid=c.id, **rec)
        session.add(r)

使用DataFrame的方法相比以前的方法要慢近100倍。

问题

我花费了大量精力构建一个完整的MCVE,因为我比较熟悉pandas而不是SQLAlchemy,并且在SQLAlchemy文档中找不到解决我的问题的方法。

我的问题是:“如何解决channelid以使我的插入成功,在性能上依赖于SQLAclhemy而不是pandas?"

请随意发表评论以改进此帖子。我要寻找的是一种合理的方法。它可以涉及更新数据模型,我有这种灵活性。

更新

阅读更多关于SQLAlchemy和测试提案@ Ramasubramanian S 的内容,我能够实现的最好的结果是:

ukeys = ['serial', 'source', 'channel']
with session.begin_nested() as trans:
    g = data.groupby(ukeys)
    for key in g.groups:
        recs = []
        for rec in data.loc[g.groups[key],:].to_dict(orient='records'):
            m = {k: rec.pop(k) for k in ukeys}
            c = session.query(Channel).filter_by(**m).first()
            #r = Record(channel=c, **rec)  
            r = Record(channelid=c.id, **rec) # Bulk Insert needs explicit id not a relationship
            recs.append(r)
        #session.add_all(recs)
        session.bulk_save_objects(recs) # Not working w/ relationship

使用关系 Record(channel=c, **rec),方法session.bulk_save_objects会引发以下异常:

IntegrityError: (psycopg2.IntegrityError) ERREUR:  une valeur NULL viole la contrainte NOT NULL de la colonne « channelid »
DETAIL:  La ligne en échec contient (1, null, 2019-01-01 08:35:00, 12)

[SQL: INSERT INTO record (timestamp, value) VALUES (%(timestamp)s, %(value)s)]
[parameters: ({'timestamp': Timestamp('2019-01-01 08:35:00'), 'value': 12}, {'timestamp': Timestamp('2019-01-01 09:20:00'), 'value': 37}, {'timestamp': Timestamp('2019-01-01 12:30:00'), 'value': 581}, {'timestamp': Timestamp('2019-01-01 01:45:00'), 'value': 0}, {'timestamp': Timestamp('2019-01-01 14:20:00'), 'value': 699})]
(Background on this error at: http://sqlalche.me/e/gkpj)

然后将channelid设置为null,它似乎无法使用relationship功能,因此我们需要明确传递channelid以使其工作。


我对SQL Alchemy并不是特别熟悉,但是浏览了一下文档后发现它是SQL,对吧?你不能分别创建这两个表然后再将它们连接起来吗?就像这样: filter(Record.serial==Channel.serial) filter(Record.source==Channel.source) filter (Record.channel==Channel.channel )``` - FiercestJim
@FiercestJim,这是一个插入查询,此时记录不存在。 - jlandercy
是的,我理解了。如果SQLAlchemy使用普通的SQL逻辑,我不明白如何在不先单独创建它们的情况下从两个不同的源创建一个表,并在指定的键上将这两个表连接在一起。这个连接的结果就是你想要的表。 在你的pandas示例中,我认为你正在做这件事。 - FiercestJim
@FiercestJim,我在想ORM是否有一种方法可以在插入时解析外键,而无需使用额外的数据结构或临时表。 - jlandercy
2个回答

5

提高插入多条记录的性能的一种方法是使用bulk_save_objectsbulk_insert_mappings批量创建对象并插入到数据库中。

此链接展示了不同插入多条记录方法的性能比较。

您可以在这里找到类似的答案。

干杯!


谢谢您的回答,您能详细说明一下您的解决方案如何解决外键关系吗? - jlandercy
根据您帖子中的示例,您需要创建所有的Channel对象并使用bulk_save_objects批量插入它们。然后创建Record对象列表并重复相同的步骤。在我的答案中有一个类似用例的问题链接,其中包含一个很好的代码示例。 - Ramasubramanian S
我已经看到了它(基本上在我的MCVE中做了同样的事情)。这是否意味着没有自动外键解析,而是由我实现解析逻辑的责任。 - jlandercy
如果你所说的“自动外键解析”是指用一个语句向表中插入数据,我不确定是否可能。但你的最小可复现例和我的建议之间的差异在于,你会一次性将表1的对象列表插入,而你会创建表2的对象列表并执行相同的操作。使用其中的一个“bulk_”方法可以显著提高性能,这也是我认为这个问题的关键点。 - Ramasubramanian S
我已经测试了您的建议,并相应地更新了我的帖子。主要事实是bulk_save_objects无法使用relationship功能。对我来说有点模糊,不太理解选择哪个选项最好。 - jlandercy

3

我认为解决方案的关键在于你的陈述

通道保存有关记录的元数据(小容量,约1k行);

既然您说这并不太高,我建议将其缓存在内存中。

channels = session.query(Channel).all()

channel_map = {}
for c in channels:
    channel_map['-'.join([c.serial, c.source, c.channel])] = c.id

现在您可以进行批量更新或者您偏爱的任何其他方式。
with session.begin_nested() as trans:
    recs = []
    for rec in data.to_dict(orient='records'):
        channel_id = channel_map['-'.join([rec['serial'], rec['source'], rec['channel']])]
        r = Record(channelid=channel_id, **rec) # Bulk Insert needs explicit id not a relationship
        recs.append(r)
    session.add_all(recs)

注意:以上是未经测试的代码,但意图是展示可能的方法。

谢谢您的回答。我不确定'-'.join([c.serial, c.source, c.channel])的唯一性,因为serial是一个没有限制的字符串。我们可以使用元组(c.serial, c.source, c.channel)代替,它可以正确地进行哈希(所有元素都是不可变的)。我选择按数据框分组,以便只创建所需的Channel对象一次,而不是使用缓存。 - jlandercy

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接