异步上下文管理器

65

我有一个异步API,用于连接并发送邮件到一个SMTP服务器,该服务器需要进行一些设置和拆卸,因此它很适合使用Python 3的contextlib中的contextmanager

但是,我不知道是否可能编写,因为它们都使用生成器语法进行编写。

以下示例演示了问题(包含混合使用yield基础和async-await语法,以演示异步调用和对上下文管理器的yield调用之间的区别)。

@contextmanager
async def smtp_connection():
    client = SMTPAsync()
    ...

    try:
        await client.connect(smtp_url, smtp_port)
        await client.starttls()
        await client.login(smtp_username, smtp_password)
        yield client
    finally:
        await client.quit()

目前在Python中是否有可能实现这种操作?如果可以,我该如何使用withas语句呢?如果不行,我有没有其他办法可以实现这个操作 - 或者可以使用旧式上下文管理器吗?


1
asyncio 还引入了 async with 异步上下文管理器协议,请参见:https://www.python.org/dev/peps/pep-0492/#asynchronous-context-managers-and-async-with - jonrsharpe
这看起来正是我想要的。有机会时将尝试实现它。 - freebie
3
从3.7版本开始(在2018年某个时间发布),contextlib将拥有@asynccontextmanager - Yet Another User
相关链接:https://dev59.com/WnA65IYBdhLWcg3wrQl4 - Charlie Parker
当使用@asynccontextmanager时,如何知道在try和finally中调用哪些函数?例如,我的自定义类有__aexit____aenter__。我需要手动调用它们吗? - Charlie Parker
4个回答

87

自Python 3.7起,您可以编写:

from contextlib import asynccontextmanager

@asynccontextmanager
async def smtp_connection():
    client = SMTPAsync()
    ...

    try:
        await client.connect(smtp_url, smtp_port)
        await client.starttls()
        await client.login(smtp_username, smtp_password)
        yield client
    finally:
        await client.quit()

在3.7版本之前,您可以使用async_generator包来实现此功能。在3.6版本上,您可以编写:

# This import changed, everything else is the same
from async_generator import asynccontextmanager

@asynccontextmanager
async def smtp_connection():
    client = SMTPAsync()
    ...

    try:
        await client.connect(smtp_url, smtp_port)
        await client.starttls()
        await client.login(smtp_username, smtp_password)
        yield client
    finally:
        await client.quit()

如果你想一直工作到3.5,你可以写:

# This import changed again:
from async_generator import asynccontextmanager, async_generator, yield_

@asynccontextmanager
@async_generator      # <-- added this
async def smtp_connection():
    client = SMTPAsync()
    ...

    try:
        await client.connect(smtp_url, smtp_port)
        await client.starttls()
        await client.login(smtp_username, smtp_password)
        await yield_(client)    # <-- this line changed
    finally:
        await client.quit()

你如何知道在 try 和 finally 中调用哪些函数?例如,我的自定义类有一个 __aexit____aenter__。我需要手动调用它们吗? - Charlie Parker
还是每个人都有 .close()/.quit() 方法? - Charlie Parker
我认为从文档中澄清或提到你的装饰函数必须在调用时返回一个生成器迭代器是很重要的,否则会让人困惑需要做什么:https://docs.python.org/3/library/contextlib.html - Charlie Parker

46

感谢 @jonrsharpe,我能够创建一个异步上下文管理器。

以下是我的示例代码,供需要参考的任何人使用:

class SMTPConnection():
    def __init__(self, url, port, username, password):
        self.client   = SMTPAsync()
        self.url      = url
        self.port     = port
        self.username = username
        self.password = password

    async def __aenter__(self):
        await self.client.connect(self.url, self.port)
        await self.client.starttls()
        await self.client.login(self.username, self.password)

        return self.client

    async def __aexit__(self, exc_type, exc, tb):
        await self.client.quit()

使用方法:

async with SMTPConnection(url, port, username, password) as client:
    await client.sendmail(...)

如果我做错了什么,请随意指出。


2
问题在于,如果同时使用两次,您的第二个输入客户端将覆盖第一个客户端,并且第一个客户端上的退出也会退出第二个客户端。 - iScrE4m
@iScrE4m 是的,我没有预料到它会被多次使用,只是创建一个实例,按需进行一次性使用。也许可以创建一个包装类来委托它的 __aenter____aexit__ 到新的实例。 - freebie
4
记得在你的 __aexit__ 中处理异常,否则你会隐藏异常并出现奇怪的隐形错误。 - Thomas Ahle
AIU这代表着“手动操作”,而asynccontextmanager是实现这一功能的现代方式,因此这个答案可能已经过时了,需要标记为过时吗? - P i
你如何知道在try和finally中调用哪些函数?例如,我的自定义类有一个__aexit____aenter__。我需要手动调用它们吗? - Charlie Parker
还是每个人都有 .close()/.quit() 方法? - Charlie Parker

8
asyncio_extras包提供了一个不错的解决方案:
import asyncio_extras

@asyncio_extras.async_contextmanager
async def smtp_connection():
    client = SMTPAsync()
    ...

对于Python<3.6,您还需要安装async_generator包,并将yield client替换为await yield_(client)


你如何知道在try和finally中调用哪些函数?例如,我的自定义类有一个__aexit____aenter__。我需要手动调用它们吗? - Charlie Parker
还是每个人都有一个.close()/.quite()方法? - Charlie Parker
我认为从文档中澄清或提到你的装饰函数必须在调用时返回一个生成器迭代器是很重要的,否则会让人困惑需要做什么:https://docs.python.org/3/library/contextlib.html - Charlie Parker

0
我发现你需要在 try 中调用 obj.__aenter__(...),并在 final 中调用 obj.__aexit__(...)。如果你只想抽象一个具有资源的过于复杂的对象,则可能也需要这样做。
例如:
import asyncio
from contextlib import asynccontextmanager

from pycoq.common import CoqContext, LocalKernelConfig
from pycoq.serapi import CoqSerapi

from pdb import set_trace as st


@asynccontextmanager
async def get_coq_serapi(coq_ctxt: CoqContext) -> CoqSerapi:
    """
    Returns CoqSerapi instance that is closed with a with statement.
    CoqContext for the file is also return since it can be used to manipulate the coq file e.g. return
    the coq statements as in for `stmt in pycoq.split.coq_stmts_of_context(coq_ctxt):`.

    example use:
    ```
    filenames = pycoq.opam.opam_strace_build(coq_package, coq_package_pin)
        filename: str
        for filename in filenames:
            with get_coq_serapi(filename) as coq, coq_ctxt:
                for stmt in pycoq.split.coq_stmts_of_context(coq_ctxt):
    ```

    ref:
    - https://dev59.com/E1oU5IYBdhLWcg3we20R
    - https://dev59.com/WnA65IYBdhLWcg3wrQl4

    Details:

    Meant to replace (see Brando's pycoq tutorial):
    ```
            async with aiofile.AIOFile(filename, 'rb') as fin:
                coq_ctxt = pycoq.common.load_context(filename)
                cfg = opam.opam_serapi_cfg(coq_ctxt)
                logfname = pycoq.common.serapi_log_fname(os.path.join(coq_ctxt.pwd, coq_ctxt.target))
                async with pycoq.serapi.CoqSerapi(cfg, logfname=logfname) as coq:
    ```
    usually then you loop through the coq stmts e.g.
    ```
                    for stmt in pycoq.split.coq_stmts_of_context(coq_ctxt):
    ```
    """
    try:
        import pycoq
        from pycoq import opam
        from pycoq.common import LocalKernelConfig
        import os

        # - note you can't return the coq_ctxt here so don't create it due to how context managers work, even if it's needed layer for e.g. stmt in pycoq.split.coq_stmts_of_context(coq_ctxt):
        # _coq_ctxt: CoqContext = pycoq.common.load_context(coq_filepath)
        # - not returned since it seems its only needed to start the coq-serapi interface
        cfg: LocalKernelConfig = opam.opam_serapi_cfg(coq_ctxt)
        logfname = pycoq.common.serapi_log_fname(os.path.join(coq_ctxt.pwd, coq_ctxt.target))
        # - needed to be returned to talk to coq
        coq: CoqSerapi = pycoq.serapi.CoqSerapi(cfg, logfname=logfname)
        # - crucial, or coq._kernel is None and .execute won't work
        await coq.__aenter__()  # calls self.start(), this  must be called by itself in the with stmt beyond yield
        yield coq
    except Exception as e:
        # fin.close()
        # coq.close()
        import traceback
        await coq.__aexit__(Exception, e, traceback.format_exc())
        # coq_ctxt is just a data class serapio no need to close it, see: https://github.com/brando90/pycoq/blob/main/pycoq/common.py#L32
    finally:
        import traceback
        err_msg: str = 'Finally exception clause'
        exception_type, exception_value = Exception('Finally exception clause'), ValueError(err_msg)
        print(f'{traceback.format_exc()=}')
        await coq.__aexit__(exception_type, exception_value, traceback.format_exc())
        # coq_ctxt is just a data class so no need to close it, see: https://github.com/brando90/pycoq/blob/main/pycoq/common.py#L32


# -

async def loop_through_files_original():
    ''' '''
    import os

    import aiofile

    import pycoq
    from pycoq import opam

    coq_package = 'lf'
    from pycoq.test.test_autoagent import with_prefix
    coq_package_pin = f"file://{with_prefix('lf')}"

    print(f'{coq_package=}')
    print(f'{coq_package_pin=}')
    print(f'{coq_package_pin=}')

    filenames: list[str] = pycoq.opam.opam_strace_build(coq_package, coq_package_pin)
    filename: str
    for filename in filenames:
        print(f'-> {filename=}')
        async with aiofile.AIOFile(filename, 'rb') as fin:
            coq_ctxt: CoqContext = pycoq.common.load_context(filename)
            cfg: LocalKernelConfig = opam.opam_serapi_cfg(coq_ctxt)
            logfname = pycoq.common.serapi_log_fname(os.path.join(coq_ctxt.pwd, coq_ctxt.target))
            async with pycoq.serapi.CoqSerapi(cfg, logfname=logfname) as coq:
                print(f'{coq._kernel=}')
                for stmt in pycoq.split.coq_stmts_of_context(coq_ctxt):
                    print(f'--> {stmt=}')
                    _, _, coq_exc, _ = await coq.execute(stmt)
                    if coq_exc:
                        raise Exception(coq_exc)


async def loop_through_files():
    """
    to test run in linux:
    ```
        python ~pycoq/pycoq/utils.py
        python -m pdb -c continue ~/pycoq/pycoq/utils.py
    ```
    """
    import pycoq

    coq_package = 'lf'
    from pycoq.test.test_autoagent import with_prefix
    coq_package_pin = f"file://{with_prefix('lf')}"

    print(f'{coq_package=}')
    print(f'{coq_package_pin=}')
    print(f'{coq_package_pin=}')

    filenames: list[str] = pycoq.opam.opam_strace_build(coq_package, coq_package_pin)
    filename: str
    for filename in filenames:
        print(f'-> {filename=}')
        coq_ctxt: CoqContext = pycoq.common.load_context(filename)
        async with get_coq_serapi(coq_ctxt) as coq:
            print(f'{coq=}')
            print(f'{coq._kernel=}')
            stmt: str
            for stmt in pycoq.split.coq_stmts_of_context(coq_ctxt):
                print(f'--> {stmt=}')
                _, _, coq_exc, _ = await coq.execute(stmt)
                if coq_exc:
                    raise Exception(coq_exc)


if __name__ == '__main__':
    asyncio.run(loop_through_files_original())
    asyncio.run(loop_through_files())
    print('Done!\a\n')

查看代码:https://github.com/brando90/pycoq/blob/main/pycoq/utils.py


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接