async
/
await
的目的是交替执行
任务,而不是函数/生成器。例如,当您使用
await asyncio.sleep(1)
时,您当前的协程会随着 sleep 被延迟。同样,
async for
会延迟其协程,直到下一个项目准备就绪。
为了运行您的单独功能,必须将每个部分创建为单独的任务。使用
Queue
在它们之间交换项目 - 任务只会被延迟,直到它们已经交换了一个项目。
from asyncio import Queue, sleep, run, gather
async def g():
for i in range(3):
await sleep(1)
yield i
async def producer(queue: Queue):
async for i in g():
print('send', i)
await queue.put(i)
await queue.put(None)
async def consumer(queue: Queue):
x = await queue.get()
while x is not None:
print('got', x)
await sleep(2)
x = await queue.get()
async def main():
queue = Queue()
await gather(
producer(queue),
consumer(queue),
)
run(main())
如果您经常需要这个功能,您也可以将其放入一个帮助对象中,该对象包装了异步可迭代器。该帮助程序封装队列和单独的任务。您可以直接在async for
语句中应用该帮助程序到异步可迭代器上。
from asyncio import Queue, sleep, run, ensure_future
async def _enqueue_items(async_iterable, queue: Queue, sentinel):
async for item in async_iterable:
await queue.put(item)
await queue.put(sentinel)
async def concurrent(async_iterable):
"""Concurrently fetch items from ``async_iterable``"""
queue = Queue()
sentinel = object()
consumer = ensure_future(
_enqueue_items(async_iterable, queue, sentinel)
)
try:
item = await queue.get()
while item is not sentinel:
yield item
item = await queue.get()
finally:
consumer.cancel()
async def g():
for i in range(3):
await sleep(1)
yield i
async def main():
async for x in concurrent(g()):
print(x)
await sleep(2)
run(main())
g
中运行sleep
,因此在main
的sleep之前。async
/await
语法存在是为了同时执行多个任务,而不是同时执行一个任务。 你只有一个任务,因此没有任何东西可以同时运行。 - MisterMiyagig
在main
睡眠时准备好下一个i
!sleep
是我实际代码中的一些实际工作吗? - cjs840312