为什么我们需要`async for`和`async with`?

48
引入async forasync with的目的是什么?我知道这些语句有PEPs,但它们显然是为语言设计者而非普通用户设计的。希望您能提供高层次的理由并辅以示例。
我自己做了一些研究,并找到了此答案

async forasync with语句是必需的,因为你会用裸的forwith语句打破yield from/await链。

作者没有给出打破链的示例,所以我还是有些困惑。此外,我注意到Python有async forasync with,但没有async whileasync try ... except。这听起来很奇怪,因为forwith只是whiletry ... except的语法糖。我的意思是,后者语句的async版本难道不会更灵活吗,因为它们是前者的构建块吗?
还有一个答案讨论了async for,但只涉及它的用途是什么不是,并没有详细说明它的作用。作为额外的福利,async forasync with是语法糖吗?如果是,它们的冗长等效形式是什么?

4
forwith只是whiletry ... except的语法糖”——不,远非如此,它们各自都有其独特之处。 - deceze
4
@deceze,官方文档链接中说明with语句“在语义上等同于”try...except...finally。而你可以很容易地使用whilenext实现for循环。也许它们不是语法糖,但它们也不是那么不同。 - nalzok
1
你需要这个新语法,因为如果它们被 "sugar" 的 with/for 语句隐式调用,那么在异步 __enter__/__exit__/__iter__/__next__ 中放置 await 的其他地方在哪里呢? - deceze
2
不行,因为那只是一个执行异步函数的阻塞调用。它不会允许事件循环执行任何其他已安排的协程,因为你只是启动和停止一个事件循环以解决一个异步“进入”。 - deceze
2
如果你这么说的话,是的。forwith封装了涉及特定方法的特定模式的协议,你可以用whiletry..except..finally手动复制。但关键在于使这些模式可重用,而不是每次编写大量样板文件。由于异步版本的样板文件不同,因此您需要它们的特定async版本。 - deceze
显示剩余6条评论
3个回答

42
TLDR: forwith 是非常复杂的语法糖,它们封装了多个相关方法的调用步骤。这使得在这些步骤之间手动添加await是不可能的——但正确使用的async for/with需要这样做。同时,这也意味着对它们必须有async支持。

为什么我们不能await美好的事物

Python的语句和表达式都由所谓的协议支持:当一个对象在某个特定的语句/表达式中使用时,Python会调用对象上对应的“特殊方法”来允许自定义。例如,x in [1, 2, 3] 委托给 list.__contains__ 来定义实际意义上的 in
大多数协议都很简单:每个语句/表达式只有一个特殊方法被调用。如果我们唯一的async特性是原始的await,那么我们仍然可以通过在正确的位置加入await来使所有这些“一个特殊方法”的语句/表达式变成"async"。

相比之下,forwith语句都对应于多个步骤:for使用iterator protocol重复获取迭代器的__next__项,而with使用context manager protocol同时进入和退出上下文。
重要的是,两者都有多个步骤可能需要异步处理。虽然我们可以手动在其中一步中加入await,但不能覆盖所有步骤。

  • The easier case to look at is with: we can address at the __enter__ and __exit__ method separately.

    We could naively define a syncronous context manager with asynchronous special methods. For entering this actually works by adding an await strategically:

    with AsyncEnterContext() as acm:
        context = await acm
        print("I entered an async context and all I got was this lousy", context)
    

    However, it already breaks down if we use a single with statement for multiple contexts: We would first enter all contexts at once, then await all of them at once.

    with AsyncEnterContext() as acm1, AsyncEnterContext() as acm2:
        context1, context2 = await acm1, await acm2  # wrong! acm1 must be entered completely before loading acm2
        print("I entered many async contexts and all I got was a rules lawyer telling me I did it wrong!")
    

    Worse, there is just no single point where we could await exiting properly.

虽然forwith是语法糖,但它们是非平凡的语法糖:它们使得多个操作更加简洁。因此,不能轻易地await单个操作。只有使用async withasync for才能覆盖每一个步骤。

为什么我们要异步处理好东西

forwith都是抽象的概念:它们完全封装了迭代/上下文的思想。

再举例其中之一,Python的for内部迭代的抽象——相比之下,while外部迭代的抽象。简而言之, 这意味着for的整个重点在于程序员不必知道迭代的实际工作原理。

  • Compare how one would iterate a list using for or while:
    some_list = list(range(20))
    index = 0                      # lists are indexed from 0
    while index < len(some_list):  # lists are indexed up to len-1
        print(some_list[index])    # lists are directly index'able
        index += 1                 # lists are evenly spaced
    
    for item in some_list:         # lists are iterable
        print(item)
    
    The external while iteration relies on knowledge about how lists work concretely: It pulls implementation details out of the iterable and puts them into the loop. In contrast, internal for iteration only relies on knowing that lists are iterable. It would work with any implementation of lists, and in fact any implementation of iterables.
底线是,forwith的整个重点在于不必处理实现细节,这包括知道需要在哪些步骤中使用async。只有通过一个通用的async withasync for才能覆盖每一步,而无需我们知道哪一步需要使用。

为什么需要使用async

一个合理的问题是,为什么forwithasync变体,而其他的没有。关于forwith存在一个微妙的点,在日常使用中并不明显:它们都表示并发性——而并发性是async的领域。
不用太详细地解释,一个大概的解释是处理例程(())、可迭代对象(for)和上下文管理器(with)的等价性。正如问题中引用的答案所指出的那样,协程实际上是一种生成器。显然,生成器也是可迭代的,事实上我们可以通过生成器表达任何可迭代的对象。不太明显的是,上下文管理器也等同于生成器——最重要的是,contextlib.contextmanager可以将生成器转换为上下文管理器。
为了一致地处理所有种类的并发性,我们需要有例程的async变体(await)、可迭代对象的async for和上下文管理器的async with。只有通过一个通用的async withasync for才能一致地覆盖每一步。

1
@CharlieParker 在你的示例for循环中,只有bodyasync的。在async for中,iterable本身可以是async的——例如,它可以从远程数据库获取数据,等待每个项目到达为止。 - MisterMiyagi
1
@CharlieParker,你认为for只是关于“增加索引或计数器”,这表明它是隐藏细节的重要抽象。即使是像mapitertools这样的高阶迭代器的简单嵌套在总体上也是极其复杂的。逻辑上,async迭代器可能会更简单,因为async for+事件循环可以确定地切换,而for+线程可以任意交错。 - MisterMiyagi
1
@CharlieParker 是的,基本上是正确的。如果你把 for x in y: 看作是一个 while 循环,不断运行 x = y.__next__(),那么同样地,你可以把 async for x in y: 看作是一个 while 循环,不断运行 x = await y.__anext__()。这允许在 "async for" 中挂起,等待异步迭代器产生下一个项目。 - MisterMiyagi
1
但我认为这样做不好。这难道不意味着async for的块吗?在循环中运行一堆任务,然后在循环外等待它们是否更好?如果async forawait关键字处阻塞,那么它有什么用?除了可能我们可以在等待期间运行其他东西之外,它们只提供了一点点好处,但基本上还是会阻塞。我感觉我漏掉了什么。 - Charlie Parker
1
@CharlieParker 允许循环(异步!== 让其他任务运行)在等待项目时阻塞是 async for 的全部意义。这可能不是“小事”,而是在另一个任务的结果上花费几秒钟、几分钟、几小时甚至完全无限期地阻塞。这与将多个任务分组在一起完全不同,比如 gather。如果这些信息还不足以满足您的需求,我不确定继续评论是否适合澄清您确实缺少的任何信息。 - MisterMiyagi
显示剩余7条评论

20

async forasync with 是从低级向高级发展的逻辑延续。

过去,编程语言中的 for 循环只能简单地迭代索引为0、1、2...max的值数组。

Python 的 for 循环是一个更高级别的结构。它可以迭代任何支持迭代协议的东西,例如集合元素或树中的节点 - none of them has items numbered 0, 1, 2, ... etc.

迭代协议的核心是 __next__ 特殊方法。每个连续的调用都返回下一个项(可以是计算值或检索数据),或者表示迭代结束。

async for 是异步对应物,它不调用常规的 __next__,而是等待异步的 __anext__,其他一切保持不变。这允许在异步程序中使用常见惯用语:

# 1. print lines of text stored in a file
for line in regular_file:
    print(line)

# 2A. print lines of text as they arrive over the network,
#
# The same idiom as above, but the asynchronous character makes
# it possible to execute other tasks while waiting for new data
async for line in tcp_stream:
    print(line)

# 2B: the same with a spawned command
async for line in running_subprocess.stdout:
    print(line)

async with的情况类似。简而言之:更方便的with块取代了try .. finally结构 - 现在被认为是惯用法 - 它可以通过其__enter____exit__方法与支持上下文管理器协议的任何对象进行通信,以进入和退出块。自然地,以前在try .. finally中使用的一切都被重写为上下文管理器(锁定、打开-关闭调用等)。

async with是具有异步__aenter____aexit__特殊方法的对应项。在异步代码进入或退出with块等待新数据、锁定或其他条件被满足时,其他任务可以运行。

注意:与for不同,可以使用异步对象来使用普通(非异步)with语句:with await lock:,但它已被弃用或不再受支持(请注意,它并不完全等同于async with)。


基本上,对于“async for”,语法只是为了确保for循环与异步代码正常工作,因为实现它并不容易(这是我的猜测)。因此,for循环的工作方式与正常情况下相同,但现在允许使用“await”关键字。这个理解大致正确吗? - Charlie Parker
(例如类似于asyncio.gather,它会让用户感觉IO调用同时进行)我想我对async for应该期望什么感到困惑。我目前的思维模式是,await要么将控制权交给新的协程(由开发人员编写),要么将控制权返回给事件循环(调度下一个“空闲”协程/不等待),这就是“魔法”的所在(因此IO实际上是重叠的)。但是对于async for,我不确定。我很好奇它是否像async with一样没有什么特别之处,或者它是实现asyncio.gather的手动方式。 - Charlie Parker
@CharlieParker 让我试着解释一下 async for line in tcp_stream: 这行代码。TCP流是以不同大小的数据包在网络上传输的字节流。读取器将这些数据包收集到缓冲区中,只有当最终出现换行符时,才会返回完整的文本行。从 async for 的角度来看,它等待流的 __anext__ 方法,正如你所写的那样,这将控制权交给事件循环来运行已准备好运行的协程,直到整行数据都被接收到缓冲区中。然后 __anext__ 也准备好将该行返回给 async for 循环。 - VPfB
1
@VPfB 谢谢您的信息!让我再重申一下,以确保我理解正确。所以 async for 本质上是一个生成器,以异步方式从 io 中获取内容,因此每当有东西准备好(关键是)按正确的顺序,它就会返回下一个内容。对吗?因此,async for 不仅允许在其主体中使用关键字 await,而且还允许迭代器以异步方式获取下一个项目并尊重迭代器的顺序。对吗? - Charlie Parker
1
@CharlieParker:关于第一点:是的,这正是异步迭代的主要原因,只是一个小提示:比“昂贵”更好的术语是“I/O绑定”(https://en.wikipedia.org/wiki/I/O_bound)。关于第二点:在那些迭代器组装整行(或其他数据单元)的示例中可能是这种情况,但通常不是异步迭代的主要特征。从磁盘文件读取行的普通迭代器几乎相同;区别在于本地文件I/O是非阻塞的且通常非常快速,我们可以认为结果立即可用。 - VPfB
显示剩余7条评论

3
我的理解是async with允许Python在上下文管理器中调用await关键字而不会出错。从with中删除async将导致错误。这很有用,因为创建的对象很可能会执行昂贵的io操作,我们必须等待 - 因此,我们可能会从此特殊的异步上下文管理器创建的对象中等待方法。如果没有正确地关闭和打开上下文管理器,则可能会在Python中创建问题(否则为什么要让Python用户学习更加微妙的语法和语义?)。
我还没有完全测试async for的作用或其复杂性,但很想看到一个示例,并且一旦需要它并更新此答案后,可能稍后测试它。我将在此处放置示例:https://github.com/brando90/ultimate-utils/blob/master/tutorials_for_myself/concurrency/asyncio_for.py

目前请参考我的带有async with的注释示例(脚本位于https://github.com/brando90/ultimate-utils/blob/master/tutorials_for_myself/concurrency/asyncio_my_example.py):

"""
1. https://realpython.com/async-io-python/#the-asyncawait-syntax-and-native-coroutines
2. https://realpython.com/python-concurrency/
3. https://dev59.com/-lEG5IYBdhLWcg3wX7pa

todo - async with, async for.

todo: meaning of:
    - The async for and async with statements are only needed to the extent that using plain for or with would “break”
        the nature of await in the coroutine. This distinction between asynchronicity and concurrency is a key one to grasp
    - One exception to this that you’ll see in the next code is the async with statement, which creates a context
        manager from an object you would normally await. While the semantics are a little different, the idea is the
        same: to flag this context manager as something that can get swapped out.
    - download_site() at the top is almost identical to the threading version with the exception of the async keyword on
        the function definition line and the async with keywords when you actually call session.get().
        You’ll see later why Session can be passed in here rather than using thread-local storage.
    - An asynchronous context manager is a context manager that is able to suspend execution in its enter and exit
        methods.
"""

import asyncio
from asyncio import Task

import time

import aiohttp
from aiohttp.client_reqrep import ClientResponse

from typing import Coroutine


async def download_site(coroutine_name: str, session: aiohttp.ClientSession, url: str) -> ClientResponse:
    """
    Calls an expensive io (get data from a url) using the special session (awaitable) object. Note that not all objects
    are awaitable.
    """
    # - the with statement is bad here in my opion since async with is already mysterious and it's being used twice
    # async with session.get(url) as response:
    #     print("Read {0} from {1}".format(response.content_length, url))
    # - this won't work since it only creates the coroutine. It **has** to be awaited. The trick to have it be (buggy)
    # synchronous is to have the main coroutine call each task we want in order instead of giving all the tasks we want
    # at once to the vent loop e.g. with the asyncio.gather which gives all coroutines, gets the result in a list and
    # thus doesn't block!
    # response = session.get(url)
    # - right way to do async code is to have this await so someone else can run. Note, if the download_site/ parent
    # program is awaited in a for loop this won't work regardless.
    response = await session.get(url)
    print(f"Read {response.content_length} from {url} using {coroutine_name=}")
    return response

async def download_all_sites_not_actually_async_buggy(sites: list[str]) -> list[ClientResponse]:
    """
    Code to demo the none async code. The code isn't truly asynchronous/concurrent because we are awaiting all the io
    calls (to the network) in the for loop. To avoid this issue, give the list of coroutines to a function that actually
    dispatches the io like asyncio.gather.

    My understanding is that async with allows the object given to be a awaitable object. This means that the object
    created is an object that does io calls so it might block so it's often the case we await it. Recall that when we
    run await f() f is either 1) coroutine that gains control (but might block code!) or 2) io call that takes a long
    time. But because of how python works after the await finishes the program expects the response to "actually be
    there". Thus, doing await blindly doesn't speed up the code. Do awaits on real io calls and call them with things
    that give it to the event loop (e.g. asyncio.gather).

    """
    # - create a awaitable object without having the context manager explode if it gives up execution.
    # - crucially, the session is an aiosession - so it is actually awaitable so we can actually give it to
    # - asyncio.gather and thus in the async code we truly take advantage of the concurrency of asynchronous programming
    async with aiohttp.ClientSession() as session:
    # with aiohttp.ClientSession() as session:  # won't work because there is an await inside this with
        tasks: list[Task] = []
        responses: list[ClientResponse] = []
        for i, url in enumerate(sites):
            task: Task = asyncio.ensure_future(download_site(f'coroutine{i}', session, url))
            tasks.append(task)
            response: ClientResponse = await session.get(url)
            responses.append(response)
        return responses


async def download_all_sites_truly_async(sites: list[str]) -> list[ClientResponse]:
    """
    Truly async program that calls creates a bunch of coroutines that download data from urls and the uses gather to
    have the event loop run it asynchronously (and thus efficiently). Note there is only one process though.
    """
    # - indicates that session is an async obj that will likely be awaited since it likely does an expensive io that
    # - waits so it wants to give control back to the event loop or other coroutines so they can do stuff while the
    # - io happens
    async with aiohttp.ClientSession() as session:
        tasks: list[Task] = []
        for i, url in enumerate(sites):
            task: Task = asyncio.ensure_future(download_site(f'coroutine{i}', session, url))
            tasks.append(task)
        responses: list[ClientResponse] = await asyncio.gather(*tasks, return_exceptions=True)
        return responses


if __name__ == "__main__":
    # - args
    sites = ["https://www.jython.org", "http://olympus.realpython.org/dice"] * 80
    start_time = time.time()

    # - run main async code
    # main_coroutine: Coroutine = download_all_sites_truly_async(sites)
    main_coroutine: Coroutine = download_all_sites_not_actually_async_buggy(sites)
    responses: list[ClientResponse] = asyncio.run(main_coroutine)

    # - print stats
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} sites in {duration} seconds")
    print('Success, done!\a')

我仍然对在 forwiths 中使用 async 有点困惑。 我的理解是 sync def 创建了一个协程,这是一个可以将执行控制权交给调用者的函数。但是在 async for x in range(10) 中,我不明白为什么需要 async,因为我曾经编写过调用等待的 for 循环,例如 for i in range(num_steps): await asyncio.sleep(1)。所以我不明白为什么需要对 for 循环使用 async。您能澄清一下吗? - Charlie Parker

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接