SqlAlchemy asyncio orm: 如何查询数据库

29

在 SqlAlchemy 的异步 orm 引擎中,如何查询一个表并获取一个值或全部?

我知道对于非异步方法,我只需要执行:

SESSION.query(TableClass).get(x)

但是在尝试使用异步方法时,它会抛出以下错误:

AttributeError: 'AsyncSession' object has no attribute 'query'.

下面是我定义的 SESSION 变量。LOOP 变量仅为 asyncio.get_event_loop(),用于在加载我的 sql 模块并填充用作缓存的变量以避免每次需要某些内容时都进行数据库缓存时启动异步方法:

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session

from .. import CONFIG, LOOP

def _build_async_db_uri(uri):
    if "+asyncpg" not in uri:
        return '+asyncpg:'.join(uri.split(":", 1))
    return uri

async def start() -> declarative_base:
    engine = create_async_engine(_build_async_db_uri(CONFIG.general.sqlalchemy_db_uri))
    async with engine.begin() as conn:
        BASE.metadata.bind = engine
        await conn.run_sync(BASE.metadata.create_all)
    return scoped_session(sessionmaker(bind=engine, autoflush=False, class_=AsyncSession))


BASE = declarative_base()
SESSION = LOOP.run_until_complete(start())

这里是一个表格和缓存函数的示例:

class TableClass:
    __tablename__ = "tableclass"
    id = Column(Integer, primary_key = True)
    alias = Column(Integer)

CACHE = {}
async def _load_all():
    global CACHE
    try:
        curr = await SESSION.query(TableClass).all()
        CACHE = {i.id: i.alias for i in curr}

LOOP.run_until_complete(_load_all())
2个回答

31
session.query 是旧版的 API。异步版本使用 select 和相关方法。
from sqlalchemy.future import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker


engine = create_async_engine(_build_async_db_uri(CONFIG.general.sqlalchemy_db_uri))
async_session = sessionmaker(
    engine, expire_on_commit=False, class_=AsyncSession
)


CACHE = {}
async def _load_all():
    global CACHE
    try:
        async with async_session() as session:
            q = select(TableClass)
            result = await session.execute(q)
            curr = result.scalars()
            CACHE = {i.id: i.alias for i in curr}
    except:
        pass

LOOP.run_until_complete(_load_all())

您可以在这里阅读更多关于SqlAlchemy异步I/O的内容here


感谢提供文档链接。我注意到了一些内容。在进一步的示例中,使用遗留API(非asyncio API)的小型实现在可等待对象中。如果我使用run_async,那么非异步函数是否会被执行为asyncio函数? - Marcel Alexandru
当然可以。您始终可以使用同步引擎和会话与旧API。 - SimfikDuke
这个确切的叫法是什么?如果我知道这种类型查询 q = select(TableClass) 的确切名称,搜索起来会更容易。 - Harshit Singhai
1
@HarshitSinghai 这是SQLAlchemy核心。这里有文档:https://docs.sqlalchemy.org/en/14/core/ - SimfikDuke

10

通过主键查询

自版本1.4起,新增了Session.get()方法,替代已经废弃的Query.get()方法。

  id = 1
  user = await session.get(User, id)

Session.get - SQLAlchemy 1.4文档

按其他列查询

使用select语句是解决问题的途径。

statement = select(User).where(User.name == 'John')
result = await session.execute(statement)

# This will return a collection of users named 'John'
johns : list[User] = result.scalars.all()

# This will take the first 'John'
first_john : User = result.scalar()

# This will take one result. 
# If your query has more than one result, an exception will be thrown. 
# Use it only if you're expecting a single result
john : User = result.scalars.one()

相关内容:https://dev59.com/YlMI5IYBdhLWcg3wptRR#63591326

select - SQLAlchemy 1.4 文档

execute

更多示例


1
scalars should be called. Correct calls are result.scalars().all() and result.scalars().one() - Safor
实际上我们都错了。scalar()确实可以工作,类似于scalars().one(),如果找到多个结果则会抛出错误。 - Ramon Dias

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接