动机
我有一些数据来自一个接口,我使用pandas DataFrame进行接口处理。 我有一个由SQLAlchemy ORM接口的数据模型。 出于MCVE的缘故,我将数据模型规范化为两个表:
channel
存储记录的元数据(小容量,约1k行);record
指向channel
的记录(较高容量,每天90k行)。
channel
的目的是避免重复。我想要的是使用SQLAlchemy设置pythonic的数据插入到record
表中,但数据源不知道 channelid
的约束条件。
数据源
这里是数据源的一个样本(我能访问的唯一数据):
import pandas as pd
recs = [
{'serial': '1618741320', 'source': 1, 'channel': 4, 'timestamp': pd.Timestamp('2019-01-01 08:35:00'), 'value': 12},
{'serial': '1350397285', 'source': 2, 'channel': 3, 'timestamp': pd.Timestamp('2019-01-01 09:20:00'), 'value': 37},
{'serial': '814387724', 'source': 2, 'channel': 1, 'timestamp': pd.Timestamp('2019-01-01 12:30:00'), 'value': 581},
{'serial': '545914014', 'source': 3, 'channel': 0, 'timestamp': pd.Timestamp('2019-01-01 01:45:00'), 'value': 0},
{'serial': '814387724', 'source': 0, 'channel': 5, 'timestamp': pd.Timestamp('2019-01-01 14:20:00'), 'value': 699}
]
data = pd.DataFrame(recs)
这里是从设置中学习到的存储在 channel
中的元信息样例。
recs = [
{'channelid': 28, 'serial': '545914014', 'source': 3, 'channel': 0},
{'channelid': 73, 'serial': '1350397285', 'source': 2, 'channel': 3},
{'channelid': 239, 'serial': '1618741320', 'source': 1, 'channel': 4},
{'channelid': 245, 'serial': '814387724', 'source': 0, 'channel': 5},
{'channelid': 259, 'serial': '814387724', 'source': 2, 'channel': 1}
]
meta= pd.DataFrame(recs)
MCVE
首先,让我们从一个MCVE开始!
我们定义数据模型:
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, Float, String, DateTime
from sqlalchemy import UniqueConstraint, ForeignKey
from sqlalchemy.orm import relationship
Base = declarative_base()
Engine = create_engine("postgresql://postgres:postgres@localhost:5432/postgres")
class Channel(Base):
__tablename__ = 'channel'
__table_args__ = (UniqueConstraint('serial', 'source', 'channel'),)
id = Column(Integer, primary_key=True)
serial = Column(String, nullable=False)
source = Column(Integer, nullable=False)
channel = Column(Integer, nullable=False)
class Record(Base):
__tablename__ = 'record'
__table_args__ = (UniqueConstraint('channelid', 'timestamp'),)
id = Column(Integer, primary_key=True)
channelid = Column(Integer, ForeignKey('channel.id'), nullable=False)
timestamp = Column(DateTime, nullable=False)
value = Column(Float, nullable=False)
channel = relationship("Channel")
Base.metadata.drop_all(Engine)
Base.metadata.create_all(Engine)
我们将channel
表提供的元数据进行更新:
with Engine.connect() as dbcon:
dbcon.execute(Channel.__table__.insert(), meta.to_dict(orient='records'))
需要解决的问题
现在我们想要轻松地将data
插入到record
表中,但不幸的是我们缺少来自数据源的channelid
(数据源并不知道它)。显然这个调用会失败:
with Engine.connect() as dbcon:
with dbcon.begin() as dbtrans:
dbcon.execute(Record.__table__.insert(), data.to_dict(orient='records'))
dbtrans.commit()
因为:
IntegrityError: (psycopg2.errors.NotNullViolation) null value in column "channelid" violates not-null constraint
DETAIL: Failing row contains (6, null, 2019-01-01 08:35:00, 12).
[SQL: 'INSERT INTO record (timestamp, value) VALUES (%(timestamp)s, %(value)s)'] [parameters: ({'timestamp': Timestamp('2019-01-01 08:35:00'), 'value': 12}, {'timestamp': Timestamp('2019-01-01 09:20:00'), 'value': 37}, {'timestamp': Timestamp('2019-01-01 12:30:00'), 'value': 581}, {'timestamp': Timestamp('2019-01-01 01:45:00'), 'value': 0}, {'timestamp': Timestamp('2019-01-01 14:20:00'), 'value': 699})]
我们可以使用pandas处理它:
meta = pd.read_sql("SELECT id AS channelid, serial, source, channel FROM channel;", Engine.connect())
full = data.merge(meta, on=['serial', 'source', 'channel'])
由于与channelid
的关联已完成,因此前面的调用将起作用:
channel serial source timestamp value channelid
0 4 1618741320 1 2019-01-01 08:35:00 12 239
1 3 1350397285 2 2019-01-01 09:20:00 37 73
2 1 814387724 2 2019-01-01 12:30:00 581 259
3 0 545914014 3 2019-01-01 01:45:00 0 28
4 5 814387724 0 2019-01-01 14:20:00 699 245
但我认为这不是解决问题的正确方式,主要是因为这会让我使用pandas而不是SQLAlchemy进行绑定。
我也尝试过这种方法,但对于9万条记录的数据集来说效率非常低下:
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=Engine)
session = Session()
with session.begin_nested() as trans:
for rec in data.to_dict(orient='records'):
c = session.query(Channel).filter_by(**{k: rec.pop(k) for k in ['serial', 'source', 'channel']}).first()
r = Record(channelid=c.id, **rec)
session.add(r)
使用DataFrame的方法相比以前的方法要慢近100倍。
问题
我花费了大量精力构建一个完整的MCVE,因为我比较熟悉pandas而不是SQLAlchemy,并且在SQLAlchemy文档中找不到解决我的问题的方法。
我的问题是:“如何解决channelid以使我的插入成功,在性能上依赖于SQLAclhemy而不是pandas?"
请随意发表评论以改进此帖子。我要寻找的是一种合理的方法。它可以涉及更新数据模型,我有这种灵活性。
更新
阅读更多关于SQLAlchemy和测试提案@ Ramasubramanian S
的内容,我能够实现的最好的结果是:
ukeys = ['serial', 'source', 'channel']
with session.begin_nested() as trans:
g = data.groupby(ukeys)
for key in g.groups:
recs = []
for rec in data.loc[g.groups[key],:].to_dict(orient='records'):
m = {k: rec.pop(k) for k in ukeys}
c = session.query(Channel).filter_by(**m).first()
#r = Record(channel=c, **rec)
r = Record(channelid=c.id, **rec) # Bulk Insert needs explicit id not a relationship
recs.append(r)
#session.add_all(recs)
session.bulk_save_objects(recs) # Not working w/ relationship
使用关系 Record(channel=c, **rec)
,方法session.bulk_save_objects
会引发以下异常:
IntegrityError: (psycopg2.IntegrityError) ERREUR: une valeur NULL viole la contrainte NOT NULL de la colonne « channelid »
DETAIL: La ligne en échec contient (1, null, 2019-01-01 08:35:00, 12)
[SQL: INSERT INTO record (timestamp, value) VALUES (%(timestamp)s, %(value)s)]
[parameters: ({'timestamp': Timestamp('2019-01-01 08:35:00'), 'value': 12}, {'timestamp': Timestamp('2019-01-01 09:20:00'), 'value': 37}, {'timestamp': Timestamp('2019-01-01 12:30:00'), 'value': 581}, {'timestamp': Timestamp('2019-01-01 01:45:00'), 'value': 0}, {'timestamp': Timestamp('2019-01-01 14:20:00'), 'value': 699})]
(Background on this error at: http://sqlalche.me/e/gkpj)
然后将channelid
设置为null
,它似乎无法使用relationship
功能,因此我们需要明确传递channelid
以使其工作。
filter(Record.serial==Channel.serial) filter(Record.source==Channel.source) filter (Record.channel==Channel.channel )```
- FiercestJim