使用Python asyncio从同步函数运行并等待异步函数

18

我的代码中有一个具有属性的类,偶尔需要运行异步代码。有时我需要从异步函数访问该属性,有时需要从同步函数访问-这就是为什么我不希望我的属性是异步的原因。此外,我认为异步属性通常是一种代码异味,请纠正我如果我错了。

我在执行同步属性中的异步方法并阻止进一步执行直到异步方法完成方面存在问题。

以下是示例代码:

import asyncio


async def main():
    print('entering main')
    synchronous_property()
    print('exiting main')


def synchronous_property():
    print('entering synchronous_property')
    loop = asyncio.get_event_loop()
    try:
        # this will raise an exception, so I catch it and ignore
        loop.run_until_complete(asynchronous())
    except RuntimeError:
        pass
    print('exiting synchronous_property')


async def asynchronous():
    print('entering asynchronous')
    print('exiting asynchronous')


asyncio.run(main())

它的输出:

entering main
entering synchronous_property
exiting synchronous_property
exiting main
entering asynchronous
exiting asynchronous

首先,捕获RuntimeError似乎是错误的,但如果我不这样做,就会出现RuntimeError: This event loop is already running异常。

其次,asynchronous()函数在同步函数执行完成后才被执行。我想对异步方法处理的数据进行一些处理,因此需要等待它完成。如果在调用synchronous_property()之后添加await asyncio.sleep(0),它将在main()完成之前调用asynchronous(),但这对我没有帮助。我需要在synchronous_property()完成之前运行asynchronous()

我错了什么?我正在运行python 3.7。


main()和asynchronous()都是异步的。synchronous_property()是同步的,但在异步中调用。因此,您打印的分组看起来是正确的。您捕获的错误警告您正在尝试创建额外的事件循环,这对整个问题非常关键。 - jwal
是的,我知道输出是正确的。我想要进行异步调用以从同步执行并阻止其执行。 - Andrzej Klajnert
4个回答

18

异步编程模块Asyncio的设计本意是不允许嵌套循环,这是有意为之的。但是,你可以在不同的线程中运行另一个事件循环。下面是一种使用线程池的变体,避免每次都创建一个新线程:

import asyncio, concurrent.futures

async def main():
    print('entering main')
    synchronous_property()
    print('exiting main')

pool = concurrent.futures.ThreadPoolExecutor()

def synchronous_property():
    print('entering synchronous_property')
    result = pool.submit(asyncio.run, asynchronous()).result()
    print('exiting synchronous_property', result)

async def asynchronous():
    print('entering asynchronous')
    await asyncio.sleep(1)
    print('exiting asynchronous')
    return 42

asyncio.run(main())
这段代码在同步到异步边界时创建一个新的事件循环,所以如果您经常这样做,请不要期望高性能。可以通过使用asyncio.new_event_loop仅为每个线程创建一个事件循环,并将其缓存在线程本地变量中来进行改进。

@Craftables OP希望他们的同步函数能够任意调用异步函数。由于同步函数本身是从异步函数中调用的,即嵌套,因此在一个事件循环中无法正常工作。 - user4815162342
好的,基本上,没有其他干净的解决方案,除了在任何地方都使用异步函数。总是有一个危险,即在更深的n个函数中需要调用异步函数,并且在链中使用单个非异步函数将使其非常不方便。知道了,谢谢。 - Andrzej Klajnert
1
@AndrzejKlajnert 没错,一切都是异步的假设根深蒂固,因为每个协程必须能够一直暂停到事件循环。递归事件循环不起作用,因为递归循环意味着新任务将在当前任务完成或挂起之前运行,这意味着它处于不一致状态。无法调用异步到同步有时被称为“函数颜色”问题(http://journal.stuffwithstuff.com/2015/02/01/what-color-is-your-function/)。 - user4815162342
1
@unhammer 因为当从异步代码调用 synchronous_property() 时,会引发 RuntimeError - asyncio 循环不支持嵌套。OP 有一个相当具体的要求,即在同步上下文中执行异步代码,即使该同步上下文本身是从异步上下文调用的。这不是 asyncio 的设计方式,这就是为什么需要使用线程的 hack(解决方法)。 - user4815162342
啊哈,谢谢 - 我应该更仔细地阅读原始问题 :) - unhammer
显示剩余2条评论

5
最简单的方法是使用现有的“wheel”,比如asgiref.async_to_sync
from asgiref.sync import async_to_sync

然后:

async_to_sync(main)()

总的来说:

async_to_sync(<your_async_func>)(<.. arguments for async function ..>)

这是一个调用类,它可以将只在事件循环线程上工作的可等待对象转换为同步可调用对象,以在子线程中运行。

如果调用堆栈包含异步循环,则代码将在其中运行。 否则,代码将在新线程的新循环中运行。

无论哪种方式,该线程都会暂停并等待从调用堆栈下方使用SyncToAsync调用的任何线程敏感代码运行,最终一旦异步任务返回就退出。


0

问题陈述似乎有问题。重新陈述问题:如何在一个包含没有异步进程(因此被认为是同步的)的线程和运行在某个事件循环中的异步进程之间进行通信?一种方法是使用两个同步队列。同步进程将其请求/参数放入QtoAsync中,并等待QtoSync。异步进程读取QtoAsync而不等待,如果找到请求/参数,则执行请求,并将结果放入QtoSync。

import queue
QtoAsync = queue.Queue()
QtoSync = queue.Queue()
...

async def asyncProc():
    while True:
        try:
            data=QtoAsync.get_nowait()
            result = await <the async that you wish to execute>
            QtoAsync.put(result) #This can block if queue is full. you can use put_nowait and handle the exception.
        except queue.Empty:
            await asyncio.sleep(0.001) #put a nominal delay forcing this to wait in event loop
....
#start the sync process in a different thread here..
asyncio.run(main()) #main invokes the async tasks including the asyncProc

The sync thread puts it request to async using:
req = <the async that you wish to execute>
QtoAsync.put(req)
result = QtoSync.get()

这应该可以工作。

问题陈述存在的问题: 1. 当使用asyncio.run(或类似方法)启动异步进程时,执行会阻塞直到异步进程完成。在调用asyncio.run之前必须显式地启动一个单独的同步线程。 2. 通常情况下,asyncio进程依赖于循环中的其他asyncio进程。因此,直接从另一个线程调用异步进程是不允许的。交互应该是与事件循环进行,并且使用两个队列是一种方法。


-4
我想要实现异步调用以从同步代码中执行并阻塞其执行。只需将同步函数改为异步函数,并等待异步函数完成即可。异步函数与普通函数一样,您可以在其中放置任何代码。如果仍然有问题,请使用您正在尝试运行的实际代码修改您的问题。
import asyncio


async def main():
    print('entering main')
    await synchronous_property()
    print('exiting main')


async def synchronous_property():
    print('entering synchronous_property')

    await asynchronous()

    # Do whatever sync stuff you want who cares

    print('exiting synchronous_property')


async def asynchronous():
    print('entering asynchronous')
    print('exiting asynchronous')


asyncio.run(main())

2
这就是关键 - 我不想让我的属性变成异步的。它也会被一些同步代码执行,我不想让我的整个代码都是异步函数。 - Andrzej Klajnert

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接