为了尽可能优雅地解决你面临的问题,需要使用非常高级的SQLAlchemy技术。我知道你是初学者,但这篇回答会一直引导你到最后。然而,解决这样的难题需要逐步进行,我们可以通过不同方式找到想要的答案。
在深入研究如何混合时间和薪资之前,你需要考虑SQL。我们如何查询任意行的Time.cost呢?我们可以轻松地将Time与Person关联起来,因为有一个简单的外键。但是,根据这个特定的模式将Time关联到Payrate比较棘手,因为Time与Payrate的关联不仅仅是通过person_id,而且还通过workedon - 在SQL中,我们最容易使用“time.person_id = person.id AND time.workedon BETWEEN payrate.start_date AND payrate.end_date”进行连接。但是,在这里,你没有“end_date”,这意味着我们也必须推导出它。这个推导过程是最棘手的部分,所以我想出的方法是这样的(我已经将你的列名小写):
SELECT payrate.person_id, payrate.hourly, payrate.starting, ending.ending
FROM payrate LEFT OUTER JOIN
(SELECT pa1.payrate_id, MIN(pa2.starting) as ending FROM payrate AS pa1
JOIN payrate AS pa2 ON pa1.person_id = pa2.person_id AND pa2.starting > pa1.starting
GROUP BY pa1.payrate_id
) AS ending ON payrate.payrate_id=ending.payrate_id
这可能不是唯一的方法,但这就是我想出来的方法 - 其他方式几乎肯定也会有类似的事情发生(即子查询、连接)。
因此,通过起始/结束工资率,我们可以确定查询的样子。我们想使用 BETWEEN 来将时间条目与日期范围匹配,但是最新的薪资记录将在“结束”日期处具有 NULL,因此解决这个问题的一种方法是使用 COALESCE 对非常高的日期进行比较(另一种方法是使用条件语句):
SELECT *, entry.hours * payrate_derived.hourly
FROM entry
JOIN
(SELECT payrate.person_id, payrate.hourly, payrate.starting, ending.ending
FROM payrate LEFT OUTER JOIN
(SELECT pa1.payrate_id, MIN(pa2.starting) as ending FROM payrate AS pa1
JOIN payrate AS pa2 ON pa1.person_id = pa2.person_id AND pa2.starting > pa1.starting
GROUP BY pa1.payrate_id
) AS ending ON payrate.payrate_id=ending.payrate_id) as payrate_derived
ON entry.workedon BETWEEN payrate_derived.starting AND COALESCE(payrate_derived.ending, "9999-12-31")
AND entry.person_id=payrate_derived.person_id
ORDER BY entry.person_id, entry.workedon
现在@hybrid在SQLAlchemy中能为您做的,仅限于"entry.hours * payrate_derived.hourly"部分,也就是在SQL表达式级别上运行。所有的JOIN等操作,都需要您在外部提供给hybrid。
因此,我们需要将这个大子查询插入到这里:
class Time(...):
@hybrid_property
def cost(self):
@cost.expression
def cost(cls):
return cls.hours * <SOMETHING>.hourly
那么让我们弄清楚什么是<SOMETHING>
。将该SELECT构建为一个对象:
from sqlalchemy.orm import aliased, join, outerjoin
from sqlalchemy import and_, func
pa1 = aliased(Payrate)
pa2 = aliased(Payrate)
ending = select([pa1.payrate_id, func.min(pa2.starting).label('ending')]).\
select_from(join(pa1, pa2, and_(pa1.person_id == pa2.person_id, pa2.starting > pa1.starting))).\
group_by(pa1.payrate_id).alias()
payrate_derived = select([Payrate.person_id, Payrate.hourly, Payrate.starting, ending.c.ending]).\
select_from(outerjoin(Payrate, ending, Payrate.payrate_id == ending.c.payrate_id)).alias()
cost()
混合函数在表达式方面需要引用 payrate_derived(我们稍后再处理 Python 方面):
class Time(...):
@hybrid_property
def cost(self):
@cost.expression
def cost(cls):
return cls.hours * payrate_derived.c.hourly
然后为了使用我们的cost()
混合函数,它必须在具有该连接的查询上下文中使用。请注意这里我们使用Python的datetime.date.max
来获取最大日期(非常方便!):
print session.query(Person.name, Time.workedon, Time.hours, Time.cost).\
select_from(Time).\
join(Time.person).\
join(payrate_derived,
and_(
payrate_derived.c.person_id == Time.person_id,
Time.workedon.between(
payrate_derived.c.starting,
func.coalesce(
payrate_derived.c.ending,
datetime.date.max
)
)
)
).\
all()
由于联接很大、笨重,我们需要经常执行它,更不用说当我们在 Python 中执行混合操作时需要加载相同的集合。我们可以使用 relationship()
进行映射,这意味着我们需要设置自定义联接条件,并且还需要实际映射到子查询,使用一个较少使用的技术称为非主要映射器。非主要映射器提供了一种将类映射到任意表或 SELECT 构造的方法,仅用于选择行。我们通常不需要使用它,因为查询已经允许我们查询任意列和子查询,但是为了从 relationship()
中获取它,它需要一个映射。映射需要定义主键,并且关系还需要知道哪一方是“外键”。这是此处最高级的部分,在这种情况下它的作用如下:
from sqlalchemy.orm import mapper, relationship, foreign
payrate_derived_mapping = mapper(Payrate, payrate_derived, non_primary=True,
primary_key=[
payrate_derived.c.person_id,
payrate_derived.c.starting
])
Time.payrate = relationship(
payrate_derived_mapping,
viewonly=True,
uselist=False,
primaryjoin=and_(
payrate_derived.c.person_id == foreign(Time.person_id),
Time.workedon.between(
payrate_derived.c.starting,
func.coalesce(
payrate_derived.c.ending,
datetime.date.max
)
)
)
)
所以那个连接我们就不需要看了。现在我们可以像下面这样更早地执行查询:
所以现在我们可以更早地执行查询,而无需再考虑那个连接。
print session.query(Person.name, Time.workedon, Time.hours, Time.cost).\
select_from(Time).\
join(Time.person).\
join(Time.payrate).\
all()
最后,我们可以将新的payrate
关系连接到Python级别的混合模型中:
class Time(Base):
@hybrid_property
def cost(self):
return self.hours * self.payrate.hourly
@cost.expression
def cost(cls):
return cls.hours * payrate_derived.c.hourly
我们现在有的解决方案需要付出很多努力,但是至少最复杂的部分——工资映射,全部在一个地方,我们再也不需要看它了。
这里是一个完整的可行示例:
from sqlalchemy import create_engine, Column, Integer, ForeignKey, Date, \
UniqueConstraint, select, func, and_, String
from sqlalchemy.orm import join, outerjoin, relationship, Session, \
aliased, mapper, foreign
from sqlalchemy.ext.declarative import declarative_base
import datetime
from sqlalchemy.ext.hybrid import hybrid_property
Base = declarative_base()
class Person(Base):
__tablename__ = 'person'
person_id = Column(Integer, primary_key=True)
name = Column(String(30), unique=True)
class Payrate(Base):
__tablename__ = 'payrate'
payrate_id = Column(Integer, primary_key=True)
person_id = Column(Integer, ForeignKey('person.person_id'))
hourly = Column(Integer)
starting = Column(Date)
person = relationship("Person")
__tableargs__ =(UniqueConstraint('person_id', 'starting',
name='uc_peron_starting'))
class Time(Base):
__tablename__ = 'entry'
entry_id = Column(Integer, primary_key=True)
person_id = Column(Integer, ForeignKey('person.person_id'))
workedon = Column(Date)
hours = Column(Integer)
person = relationship("Person")
@hybrid_property
def cost(self):
return self.hours * self.payrate.hourly
@cost.expression
def cost(cls):
return cls.hours * payrate_derived.c.hourly
pa1 = aliased(Payrate)
pa2 = aliased(Payrate)
ending = select([pa1.payrate_id, func.min(pa2.starting).label('ending')]).\
select_from(join(pa1, pa2, and_(
pa1.person_id == pa2.person_id,
pa2.starting > pa1.starting))).\
group_by(pa1.payrate_id).alias()
payrate_derived = select([Payrate.person_id, Payrate.hourly, Payrate.starting, ending.c.ending]).\
select_from(outerjoin(Payrate, ending, Payrate.payrate_id == ending.c.payrate_id)).alias()
payrate_derived_mapping = mapper(Payrate, payrate_derived, non_primary=True,
primary_key=[
payrate_derived.c.person_id,
payrate_derived.c.starting
])
Time.payrate = relationship(
payrate_derived_mapping,
viewonly=True,
uselist=False,
primaryjoin=and_(
payrate_derived.c.person_id == foreign(Time.person_id),
Time.workedon.between(
payrate_derived.c.starting,
func.coalesce(
payrate_derived.c.ending,
datetime.date.max
)
)
)
)
e = create_engine("postgresql://scott:tiger@localhost/test", echo=False)
Base.metadata.drop_all(e)
Base.metadata.create_all(e)
session = Session(e)
p1 = Person(name='p1')
session.add(p1)
session.add_all([
Payrate(hourly=10, starting=datetime.date(2013, 5, 17), person=p1),
Payrate(hourly=15, starting=datetime.date(2013, 5, 25), person=p1),
Payrate(hourly=20, starting=datetime.date(2013, 6, 10), person=p1),
])
session.add_all([
Time(person=p1, workedon=datetime.date(2013, 5, 19), hours=10),
Time(person=p1, workedon=datetime.date(2013, 5, 27), hours=5),
Time(person=p1, workedon=datetime.date(2013, 5, 30), hours=5),
Time(person=p1, workedon=datetime.date(2013, 6, 18), hours=12),
])
session.commit()
print session.query(Person.name, Time.workedon, Time.hours, Time.cost).\
select_from(Time).\
join(Time.person).\
join(Time.payrate).\
all()
for time in session.query(Time):
print time.person.name, time.workedon, time.hours, time.payrate.hourly, time.cost
输出(第一行是总体版本,其余是每个对象的):
[(u'p1', datetime.date(2013, 5, 19), 10, 100), (u'p1', datetime.date(2013, 5, 27), 5, 75), (u'p1', datetime.date(2013, 5, 30), 5, 75), (u'p1', datetime.date(2013, 6, 18), 12, 240)]
p1 2013-05-19 10 10 100
p1 2013-05-27 5 15 75
p1 2013-05-30 5 15 75
p1 2013-06-18 12 20 240
starting <= workedon order by starting DESC limit 1
。 - Frustrated