如何在另一个asyncio循环中使用asyncio循环

16

我一直在尝试各种方法来使用一个asyncio循环嵌套在另一个asyncio循环中。但是大多数情况下我的测试都以错误结束,例如:

RuntimeError: This event loop is already running

下面是我起始的基本测试代码,您可以看到我正在尝试做什么。在这个测试之后,我尝试了很多东西,但是它们很混乱,所以我认为在寻求帮助时应该保持简单。如果有人能指导我正确的方向,那就太好了。感谢您的时间!

import asyncio

async def fetch(data):
    message = 'Hey {}!'.format(data)
    other_data = ['image_a.com', 'image_b.com', 'image_c.com']
    images = sub_run(other_data)
    return {'message' : message, 'images' : images}

async def bound(sem, data):
    async with sem:
        r = await fetch(data)
        return r

async def build(dataset):
    tasks = []
    sem = asyncio.Semaphore(400)

    for data in dataset:
        task = asyncio.ensure_future(bound(sem, data))
        tasks.append(task)

    r = await asyncio.gather(*tasks)
    return r

def run(dataset):
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(build(dataset))
    responses = loop.run_until_complete(future)
    loop.close()
    return responses

async def sub_fetch(data):
    image = 'https://{}'.format(data)
    return image

async def sub_bound(sem, data):
    async with sem:
        r = await sub_fetch(data)
        return r

async def sub_build(dataset):
    tasks = []
    sem = asyncio.Semaphore(400)

    for data in dataset:
        task = asyncio.ensure_future(sub_bound(sem, data))
        tasks.append(task)

    r = await asyncio.gather(*tasks)
    return r

def sub_run(dataset):
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(sub_build(dataset))
    responses = loop.run_until_complete(future)
    loop.close()
    return responses

if __name__ == '__main__':
    dataset = ['Joe', 'Bob', 'Zoe', 'Howard']
    responses = run(dataset)
    print (responses)
1个回答

17

在正在运行的事件循环中运行 loop.run_until_complete 将会阻塞外部循环,这将使使用 asyncio 的目的失效。因此,asyncio 事件循环不是递归的,也不应该需要递归地运行它们。而是在现有的事件循环上 await 一个任务。

对于您的情况,请删除 sub_run 并简单替换它的使用:

images = sub_run(other_data)

随着:

images = await sub_build(other_data)

并且它将正常工作,运行子协程,并在内部协程完成之前不继续执行外部协程,就像您从同步代码中预期的那样。


2
太棒了!我现在懂了!我可以只使用同一个循环并为该循环创建更多任务。你解释得非常清楚,非常感谢。 - antfuentes87
我们需要将 sub_run() 从同步更改为异步吗? - Emerson Xu
@EmersonXu 的想法是完全摆脱 sub_run - user4815162342
@user4815162342 是的,在这种情况下删除 sub_run() 很清楚。但是,假设我们在 run() 中有许多同步函数调用,我们需要将所有同步函数转换为异步吗? - Emerson Xu
1
@EmersonXu 要么将它们转换,要么使用run_in_executor来执行传统的阻塞代码。 - user4815162342
@Tom 我能理解你的痛苦,拥有两个版本的一切是异步编程中以颜色功能为基础的一个众所周知的问题。这不仅仅是 Python 的问题,Rust、C#、C++、Kotlin 以及在某种程度上 JavaScript 也面临着同样的问题。不幸的是,通过允许递归调用进入事件循环来解决这个问题是行不通的,因为异步挂起信号无法在调用栈的同步部分传播。而且,将一个无颜色的方法(像 Go 一样)加入到一个已有的具有大型扩展生态系统的语言中也不是一个现实的选择。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接