Python AsyncIO中的异步生成器yield

5

我有一个简单的类,利用异步生成器来获取URL列表:

import aiohttp
import asyncio
import logging
import sys

LOOP = asyncio.get_event_loop()
N_SEMAPHORE = 3

FORMAT = '[%(asctime)s] - %(message)s'
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format=FORMAT)
logger = logging.getLogger(__name__)

class ASYNC_GENERATOR(object):
    def __init__(self, n_semaphore=N_SEMAPHORE, loop=LOOP):
        self.loop = loop
        self.semaphore = asyncio.Semaphore(n_semaphore)
        self.session = aiohttp.ClientSession(loop=self.loop)

    async def _get_url(self, url):
        """
        Sends an http GET request to an API endpoint
        """

        async with self.semaphore:
            async with self.session.get(url) as response:
                logger.info(f'Request URL: {url} [{response.status}]')
                read_response = await response.read()

                return {
                    'read': read_response,
                    'status': response.status,
                }

    def get_routes(self, urls):
        """
        Wrapper around _get_url (multiple urls asynchronously)

        This returns an async generator
        """

        # Asynchronous http GET requests
        coros = [self._get_url(url) for url in urls]
        futures = asyncio.as_completed(coros)
        for future in futures:
            yield self.loop.run_until_complete(future)

    def close(self):
        self.session._connector.close()

当我执行这段代码的主要部分时:
if __name__ == '__main__':
    ag = ASYNC_GENERATOR()
    urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
    responses = ag.get_routes(urls)
    for response in responses:
        response = next(ag.get_routes(['https://httpbin.org/get']))
    ag.close()

日志打印输出内容为:
[2018-05-15 12:59:49,228] - Request URL: https://httpbin.org/get?x=3 [200]
[2018-05-15 12:59:49,235] - Request URL: https://httpbin.org/get?x=2 [200]
[2018-05-15 12:59:49,242] - Request URL: https://httpbin.org/get?x=6 [200]
[2018-05-15 12:59:49,285] - Request URL: https://httpbin.org/get?x=5 [200]
[2018-05-15 12:59:49,290] - Request URL: https://httpbin.org/get?x=0 [200]
[2018-05-15 12:59:49,295] - Request URL: https://httpbin.org/get?x=7 [200]
[2018-05-15 12:59:49,335] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 12:59:49,340] - Request URL: https://httpbin.org/get?x=4 [200]
[2018-05-15 12:59:49,347] - Request URL: https://httpbin.org/get?x=1 [200]
[2018-05-15 12:59:49,388] - Request URL: https://httpbin.org/get?x=9 [200]
[2018-05-15 12:59:49,394] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,444] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,503] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,553] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,603] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,650] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,700] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,825] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,875] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,922] - Request URL: https://httpbin.org/get [200]

由于responses是异步生成器,我期望它从异步生成器中产生一个响应(只有在实际产生后才发送请求),向没有x参数的端点发送一个单独的请求,然后产生异步生成器中的下一个响应。 这应该在带有x参数的请求和不带参数的请求之间来回切换。 然而,它先产生带有x参数的异步生成器的所有响应,然后是所有没有参数的https请求。

当我执行以下操作时会发生类似的事情:

ag = ASYNC_GENERATOR()
urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
responses = ag.get_routes(urls)
next(responses)
response = next(ag.get_routes(['https://httpbin.org/get']))
ag.close()

日志输出如下:

[2018-05-15 13:08:38,643] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 13:08:38,656] - Request URL: https://httpbin.org/get?x=1 [200]
[2018-05-15 13:08:38,681] - Request URL: https://httpbin.org/get?x=3 [200]
[2018-05-15 13:08:38,695] - Request URL: https://httpbin.org/get?x=4 [200]
[2018-05-15 13:08:38,717] - Request URL: https://httpbin.org/get?x=6 [200]
[2018-05-15 13:08:38,741] - Request URL: https://httpbin.org/get?x=2 [200]
[2018-05-15 13:08:38,750] - Request URL: https://httpbin.org/get?x=0 [200]
[2018-05-15 13:08:38,773] - Request URL: https://httpbin.org/get?x=9 [200]
[2018-05-15 13:08:38,792] - Request URL: https://httpbin.org/get?x=7 [200]
[2018-05-15 13:08:38,803] - Request URL: https://httpbin.org/get?x=5 [200]
[2018-05-15 13:08:38,826] - Request URL: https://httpbin.org/get [200]

相反,我想要的是:

[2018-05-15 13:08:38,643] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 13:08:38,826] - Request URL: https://httpbin.org/get [200]

有时候我想先检索所有的响应再执行其他操作。但是,有时候我想插入一些中间请求,在生成器返回下一个项之前进行处理(即,生成器从分页搜索结果返回结果,我想在移动到下一页之前进一步处理每个页面的链接)。

我需要做哪些更改才能实现所需的结果?


responses is not an async generator; it's a regular, normal, synchronous generator function. To be async, you'd have to use async def responses(...): - Martijn Pieters
我不太明白。您能提供一小段代码吗?async def responses(...):应该放在get_routes_get_url之间吗?还是我需要在__main__中使用它?在async def responses(...):中,我需要等待什么?非常感谢您的耐心,因为我还在学习这个复杂的异步编程! - slaw
@MartijnPieters的问题在于OP正在使用as_completed,它本身是一个普通的生成器,但设计(奇怪地)用于与asyncio一起使用。因此,自然而然地使用另一个普通生成器进行包装。使用as_completed同步产生的协程通过run_until_complete是一种合法的使用方式,但我不建议这样做。 - user4815162342
@user4815162342 如果您不介意提供正确的解决方案,我很灵活并愿意学习如何处理这个问题。我从网络上不同的例子中获取了有限的知识来拼凑这个程序,所以我会感激任何帮助。希望我的目标是清晰明确的。 - slaw
1
@slaw:抱歉,我的评论太简短了,我不得不离开一下。在Python中,“异步生成器”这个术语有一个非常具体的含义;您的生成器函数将任务推入异步IO循环,但它不是异步生成器。您正在触发一系列任务,然后等待下一个任务完成;所有这些任务都是协作执行的。 - Martijn Pieters
显示剩余2条评论
1个回答

7
略过技术问题,关于 responses 是否是异步生成器的问题(它不是,因为 Python 使用了该术语),你的问题在于 as_completedas_completed 启动了一堆协程并提供了获取它们完成后的结果的方法,这些协程是并行运行的。虽然从文档(在更新版本中有所改进)中并不明显,但如果你考虑到原始的 concurrent.futures.as_completed 是基于线程并发执行的 future 来工作的,那么这些 future 在并行运行也就不难理解了。从概念上讲,asyncio futures 也是如此。
你的代码仅获取第一个(最快到达的)结果,然后开始执行其他任务,同时使用asyncio。传递给as_completed的剩余协程不会被冻结,仅因为没有人收集它们的结果 - 它们在后台执行其任务,一旦完成就可以通过await来准备好(在您的情况下是由as_completed内部的代码await,您可以使用loop.run_until_complete()来访问)。我敢猜测,不带参数的URL需要更长时间才能检索到比只有参数x的URL,这就是为什么它在所有其他协程之后被打印出来的原因。
换句话说,打印的那些日志行意味着asyncio正在执行其工作并提供您请求的并行执行!如果您不想要并行执行,请不要请求它,按顺序执行它们:
def get_routes(self, urls):
    for url in urls:
        yield loop.run_until_complete(self._get_url(url))

但这是一种不好的使用asyncio的方式 - 它的主循环是不可重入的,因此为了确保可组合性,你几乎肯定希望在顶层只旋转循环一次。通常使用类似loop.run_until_complete(main())loop.run_forever()的结构来实现。正如Martijn指出的那样,您可以通过使get_routes成为一个真正的异步生成器来保留良好的生成器API,从而实现这一点。
async def get_routes(self, urls):
    for url in urls:
        result = await self._get_url(url)
        yield result

现在你可以拥有一个像这样的main()协程:
async def main():
    ag = ASYNC_GENERATOR()
    urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
    responses = ag.get_routes(urls)
    async for response in responses:
        # simulate `next` with async iteration
        async for other_response in ag.get_routes(['https://httpbin.org/get']):
            break
    ag.close()

loop.run_until_complete(main())

“get_routes()”异步生成器在“await self._get_url(url)”调用返回结果后,可以添加纯非查询字符串URL,然后在“yield result”之后运行至少这些请求以并行处理“urls”列表的同步处理。 - Martijn Pieters
当然,我至少会更改函数的名称,我正在考虑进行一般性重构以更接近OP所尝试的内容。 - Martijn Pieters
loop.run_until_complete无法放置在一个函数中,有什么原因吗?从抽象的角度来看,我不希望用户了解任何关于async或事件循环的知识。我曾经在许多在线示例中看到过main()函数的使用,但现在用户需要知道调用类,将其放入main()函数中,并在所有想要的分页响应时将其传递给事件循环。此外,作为一个async新手,如果您愿意在底部包含更聪明的代码来进行学习,我将感到非常高兴。我会继续研究这个问题,但无论如何,谢谢您的帮助! - slaw
@MartijnPieters提出的“聪明”的重构只有在使用异步生成器时才能实现额外的并行性,即如果您在使用生成器时仍然停留在事件循环内部。如果您在循环中调用run_until_complete,它将无法正常工作。此外,答案已经足够长了 - 这种优化适合作为另一个问题的材料。 - user4815162342
@slaw 是的,绝对可以!您可以启动一个后台线程,创建一个事件循环,然后使用 loop.run_forever() 运行它。asyncio.run_coroutine_threadsafe 可以从任何您喜欢的线程向循环提交协程,并返回一个 concurrent.futures.Future,您可以使用它来查询(或等待)结果。 - user4815162342
显示剩余6条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接