如何在Python中创建异步生成器?

14

我正在尝试将这段Python2.7代码重写为新的异步编程模式:

def get_api_results(func, iterable):
    pool = multiprocessing.Pool(5)
    for res in pool.map(func, iterable):
        yield res

map() 会一直阻塞直到所有结果都被计算完毕,因此我尝试将其重写为异步实现,该实现会在结果准备好后立即生成。就像 map() 一样,返回的值必须按照 iterable 相同的顺序返回。我尝试了这个(由于遗留的身份验证要求,我需要使用 requests):

import requests

def get(i):
    r = requests.get('https://example.com/api/items/%s' % i)
    return i, r.json()

async def get_api_results():
    loop = asyncio.get_event_loop()
    futures = []
    for n in range(1, 11):
        futures.append(loop.run_in_executor(None, get, n))
    async for f in futures:
        k, v = await f
        yield k, v

for r in get_api_results():
    print(r)

但是在Python 3.6中,我得到了:

  File "scratch.py", line 16, in <module>
    for r in get_api_results():
TypeError: 'async_generator' object is not iterable

我该如何实现这个目标?


1
不要将事件循环放在异步代码块中,异步代码必须由事件循环运行,而不是相反的。 - Martijn Pieters
谢谢!我肯定在这里漏掉了什么。我看到的所有事件循环示例都使用loop.run_until_complete(get_api_results()),在我看来这将使调用阻塞并且丢失结果。 - Erik Cederstrand
通常情况下,您会有更多的协程来处理结果,事件循环驱动这些协程。 - Martijn Pieters
没错,但是循环并没有在那里被驱动loop.run_until_complete()调用坚定地位于异步代码路径之外。说实话,我会让main()接受一个loop参数,以便从loop.run_until_complete(main(loop))传递。 - Martijn Pieters
这是标准的 Stack Overflow 失败模式,标题过于通用,实际上是一个调试问题。虽然这是我在 Google 上搜索“async generator python”的第四个最高结果,但对于想要了解标题中问题答案的人来说几乎没有用处!一个不那么虚假通用的标题将会是一个重大的改进。 - Mark Amery
显示剩余2条评论
2个回答

13
关于你的旧版(2.7)代码 - 多进程被认为是一个强大的替代品,可以同时处理CPU密集型任务,而线程模块则不太适用。你的代码可能不会受到CPU限制 - 因为它只需要进行HTTP请求 - 使用线程可能已经足以解决你的问题。
然而,Python 3+有一个很好的模块concurrent.futures,通过酷炫的Executor类提供了更清晰的API,而不是直接使用threading。这个模块也可作为一个外部包在python 2.7中使用。
以下代码适用于python 2和python 3:
# For python 2, first run:
#
#    pip install futures
#
from __future__ import print_function

import requests
from concurrent import futures

URLS = [
    'http://httpbin.org/delay/1',
    'http://httpbin.org/delay/3',
    'http://httpbin.org/delay/6',
    'http://www.foxnews.com/',
    'http://www.cnn.com/',
    'http://europe.wsj.com/',
    'http://www.bbc.co.uk/',
    'http://some-made-up-domain.coooom/',
]


def fetch(url):
    r = requests.get(url)
    r.raise_for_status()
    return r.content


def fetch_all(urls):
    with futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_to_url = {executor.submit(fetch, url): url for url in urls}
        print("All URLs submitted.")
        for future in futures.as_completed(future_to_url):
            url = future_to_url[future]
            if future.exception() is None:
                yield url, future.result()
            else:
                # print('%r generated an exception: %s' % (
                # url, future.exception()))
                yield url, None


for url, s in fetch_all(URLS):
    status = "{:,.0f} bytes".format(len(s)) if s is not None else "Failed"
    print('{}: {}'.format(url, status))

这段代码使用基于线程的 futures.ThreadPoolExecutor,其中很多魔法都在此处使用的 as_completed() 中。
你上面的 Python 3.6 代码使用了 run_in_executor(),它创建了一个 futures.ProcessPoolExecutor(),并没有真正使用异步 IO!!
如果你真的想使用 asyncio,你需要使用支持 asyncio 的 HTTP 客户端,比如 aiohttp。以下是一个示例代码:
import asyncio

import aiohttp


async def fetch(session, url):
    print("Getting {}...".format(url))
    async with session.get(url) as resp:
        text = await resp.text()
    return "{}: Got {} bytes".format(url, len(text))


async def fetch_all():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, "http://httpbin.org/delay/{}".format(delay))
                 for delay in (1, 1, 2, 3, 3)]
        for task in asyncio.as_completed(tasks):
            print(await task)
    return "Done."


loop = asyncio.get_event_loop()
resp = loop.run_until_complete(fetch_all())
print(resp)
loop.close()

正如您所看到的,asyncio还有一个as_completed(),现在使用真正的异步IO,在一个进程的一个线程上利用。


1
由于协程是生成器,因此在其中不可能使用简单的“yield”。但这是可能的。https://dev59.com/XVoU5IYBdhLWcg3wV2C8#37550568 - im7mortal
@im7mortal:谢谢,我已经从答案中删除了这部分内容。 - Udi

8

不要将事件循环放在另一个协程中。事件循环是异步代码的最外层“驱动程序”,应该同步运行。

如果需要处理获取到的结果,请编写更多的协程来处理。它们可以从队列中获取数据,也可以直接驱动获取操作。

例如,您可以拥有一个主函数来获取和处理结果:

async def main(loop): 
    for n in range(1, 11):
        future = loop.run_in_executor(None, get, n)
        k, v = await future
        # do something with the result

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))

我会使用类似于aiohttp这样的异步库,使get()函数也正确异步化,这样您就不必使用执行器了。请注意保留HTML标记。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接