我有一个简单的类,利用异步生成器来获取URL列表:
import aiohttp
import asyncio
import logging
import sys
LOOP = asyncio.get_event_loop()
N_SEMAPHORE = 3
FORMAT = '[%(asctime)s] - %(message)s'
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format=FORMAT)
logger = logging.getLogger(__name__)
class ASYNC_GENERATOR(object):
def __init__(self, n_semaphore=N_SEMAPHORE, loop=LOOP):
self.loop = loop
self.semaphore = asyncio.Semaphore(n_semaphore)
self.session = aiohttp.ClientSession(loop=self.loop)
async def _get_url(self, url):
"""
Sends an http GET request to an API endpoint
"""
async with self.semaphore:
async with self.session.get(url) as response:
logger.info(f'Request URL: {url} [{response.status}]')
read_response = await response.read()
return {
'read': read_response,
'status': response.status,
}
def get_routes(self, urls):
"""
Wrapper around _get_url (multiple urls asynchronously)
This returns an async generator
"""
# Asynchronous http GET requests
coros = [self._get_url(url) for url in urls]
futures = asyncio.as_completed(coros)
for future in futures:
yield self.loop.run_until_complete(future)
def close(self):
self.session._connector.close()
当我执行这段代码的主要部分时:
if __name__ == '__main__':
ag = ASYNC_GENERATOR()
urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
responses = ag.get_routes(urls)
for response in responses:
response = next(ag.get_routes(['https://httpbin.org/get']))
ag.close()
日志打印输出内容为:
[2018-05-15 12:59:49,228] - Request URL: https://httpbin.org/get?x=3 [200]
[2018-05-15 12:59:49,235] - Request URL: https://httpbin.org/get?x=2 [200]
[2018-05-15 12:59:49,242] - Request URL: https://httpbin.org/get?x=6 [200]
[2018-05-15 12:59:49,285] - Request URL: https://httpbin.org/get?x=5 [200]
[2018-05-15 12:59:49,290] - Request URL: https://httpbin.org/get?x=0 [200]
[2018-05-15 12:59:49,295] - Request URL: https://httpbin.org/get?x=7 [200]
[2018-05-15 12:59:49,335] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 12:59:49,340] - Request URL: https://httpbin.org/get?x=4 [200]
[2018-05-15 12:59:49,347] - Request URL: https://httpbin.org/get?x=1 [200]
[2018-05-15 12:59:49,388] - Request URL: https://httpbin.org/get?x=9 [200]
[2018-05-15 12:59:49,394] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,444] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,503] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,553] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,603] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,650] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,700] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,825] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,875] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,922] - Request URL: https://httpbin.org/get [200]
由于responses
是异步生成器,我期望它从异步生成器中产生一个响应(只有在实际产生后才发送请求),向没有x
参数的端点发送一个单独的请求,然后产生异步生成器中的下一个响应。 这应该在带有x
参数的请求和不带参数的请求之间来回切换。 然而,它先产生带有x
参数的异步生成器的所有响应,然后是所有没有参数的https请求。
当我执行以下操作时会发生类似的事情:
ag = ASYNC_GENERATOR()
urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
responses = ag.get_routes(urls)
next(responses)
response = next(ag.get_routes(['https://httpbin.org/get']))
ag.close()
日志输出如下:
[2018-05-15 13:08:38,643] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 13:08:38,656] - Request URL: https://httpbin.org/get?x=1 [200]
[2018-05-15 13:08:38,681] - Request URL: https://httpbin.org/get?x=3 [200]
[2018-05-15 13:08:38,695] - Request URL: https://httpbin.org/get?x=4 [200]
[2018-05-15 13:08:38,717] - Request URL: https://httpbin.org/get?x=6 [200]
[2018-05-15 13:08:38,741] - Request URL: https://httpbin.org/get?x=2 [200]
[2018-05-15 13:08:38,750] - Request URL: https://httpbin.org/get?x=0 [200]
[2018-05-15 13:08:38,773] - Request URL: https://httpbin.org/get?x=9 [200]
[2018-05-15 13:08:38,792] - Request URL: https://httpbin.org/get?x=7 [200]
[2018-05-15 13:08:38,803] - Request URL: https://httpbin.org/get?x=5 [200]
[2018-05-15 13:08:38,826] - Request URL: https://httpbin.org/get [200]
相反,我想要的是:
[2018-05-15 13:08:38,643] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 13:08:38,826] - Request URL: https://httpbin.org/get [200]
有时候我想先检索所有的响应再执行其他操作。但是,有时候我想插入一些中间请求,在生成器返回下一个项之前进行处理(即,生成器从分页搜索结果返回结果,我想在移动到下一页之前进一步处理每个页面的链接)。
我需要做哪些更改才能实现所需的结果?
responses
is not an async generator; it's a regular, normal, synchronous generator function. To be async, you'd have to useasync def responses(...):
- Martijn Pietersasync def responses(...):
应该放在get_routes
和_get_url
之间吗?还是我需要在__main__
中使用它?在async def responses(...):
中,我需要等待什么?非常感谢您的耐心,因为我还在学习这个复杂的异步编程! - slawas_completed
,它本身是一个普通的生成器,但设计(奇怪地)用于与asyncio一起使用。因此,自然而然地使用另一个普通生成器进行包装。使用as_completed
同步产生的协程通过run_until_complete
是一种合法的使用方式,但我不建议这样做。 - user4815162342