如何在Python同步代码中调用异步函数

75

所以我被锁定在一个符合我的桌面应用程序的Python 3.6.2解释器中。

我想要的是从同步方法或函数调用异步函数。

当从桌面应用程序调用Python函数时,它必须是一个无法等待的普通函数。

从桌面应用程序中,我能够发送一组URL,并且我希望以异步方式发送每个URL的响应。

这是我的尝试,我标记了SyntaxError,我不知道如何绕过它。

import fmeobjects
import asyncio
import aiohttp
import async_timeout
logger = fmeobjects.FMELogFile()
timeout = 10

class FeatureProcessor(object):
    def __init__(self):
        pass
    def input(self, feature):
        urls_and_coords = zip(feature.getAttribute('_list{}._wms'),\
        feature.getAttribute('_list{}._xmin'),\
        feature.getAttribute('_list{}._ymin'),\
        feature.getAttribute('_list{}._xmax'),\
        feature.getAttribute('_list{}._ymax'))
        -> SyntaxError: newfeature = await main(urls_and_coords)
        self.pyoutput(newfeature)
        
    def close(self):
       pass 

async def main(urls):
    loop = asyncio.get_event_loop()
    async with aiohttp.ClientSession(loop=loop) as session:
        feature = loop.run_until_complete(fetch_all(session, urls, loop))
        return feature
        
async def fetch_all(session, urls, loop):
    results = await asyncio.gather(*[loop.create_task(fetch(session, url)) for url in urls])
    return results
    

async def fetch(session, url):
    with async_timeout.timeout(10):
        async with session.get(url[0]) as response:
            newFeature = fmeobjects.FMEFeature()
            response_data = await response
            newFeature.setAttribute('response', response_data)
            newFeature.setAttribute('_xmin',url[1])
            newFeature.setAttribute('_xmax',url[2])
            newFeature.setAttribute('_ymin',url[3])
            newFeature.setAttribute('_ymax',url[4])
            return newFeature

我尝试进行了以下更改:

import fme
import fmeobjects
import asyncio
import aiohttp
import async_timeout
logger = fmeobjects.FMELogFile()

class FeatureProcessor(object):
    def __init__(self):
        pass
    def input(self, feature):
        urls_and_coords = zip(feature.getAttribute('_list{}._wms'),\
        feature.getAttribute('_list{}._xmin'),\
        feature.getAttribute('_list{}._ymin'),\
        feature.getAttribute('_list{}._xmax'),\
        feature.getAttribute('_list{}._ymax'))
        loop = asyncio.get_event_loop()
        result = loop.run_until_complete(main(loop, urls_and_coords))
        #feature.setAttribute('result',result)
        self.pyoutput(feature)
        
    def close(self):
       pass 

async def main(loop, urls):
    async with aiohttp.ClientSession(loop=loop) as session:
        return await fetch_all(session, urls, loop)

        
async def fetch_all(session, urls, loop):
    results = await asyncio.gather(*[loop.create_task(fetch(session, url)) for url in urls])
    return results
    

async def fetch(session, url):
    with async_timeout.timeout(10):
        async with session.get(url[0]) as response:
            #newFeature = fmeobjects.FMEFeature()
            response = await response
            #newFeature.setAttribute('response', response_data)
            #newFeature.setAttribute('_xmin',url[1])
            #newFeature.setAttribute('_xmax',url[2])
            #newFeature.setAttribute('_ymin',url[3])
            #newFeature.setAttribute('_ymax',url[4])
            return response, url[1], url[2], url[3], url[4]


        

但是现在我遇到了这个错误:
Python Exception <TypeError>: object ClientResponse can't be used in 'await' 
expression
Traceback (most recent call last):
  File "<string>", line 20, in input
  File "asyncio\base_events.py", line 467, in run_until_complete
  File "<string>", line 29, in main
  File "<string>", line 33, in fetch_all
  File "<string>", line 41, in fetch
TypeError: object ClientResponse can't be used in 'await' expression

你可能想看一下trio库。它比asyncio标准库有一个更加简单直接的接口。 - MisterMiyagi
很酷,看起来它实现了类似于Python 3.7中的asyncio的运行方式。我会去看一下这个。 - Paal Pedersen
4个回答

72

@deceze的回答可能是在Python 3.6中你能做到的最好的。 但在Python 3.7中,你可以直接使用asyncio.run来实现:

newfeature = asyncio.run(main(urls))

它将正确地创建、处理和关闭一个event_loop


7
如果代码已经在 asyncio.run 调用中运行,该怎么办?如果你在一个被 asyncio.run 调用的函数内部使用 asyncio.run,那么你会得到 RuntimeError: asyncio.run() cannot be called from a running event loop - birgersp
4
如果您已经在事件循环内部,您可以通过 result = await main(urls) 直接调用它。 - mgutsche
9
那并没有解决原问题,如果函数未声明为"async def",就不能使用await,而这可能是不可行的,例如对于魔法方法或无法更改的接口。 - Hannes Landeholm
1
可以使用asyncio.get_event_loopasyncio.get_running_loop来检查(和检索)任何正在运行的循环;如果有一个,可以通过调用asyncio.ensure_future来安排协程。 - Berislav Lopac

34
你可以使用事件循环来执行异步函数直到完成:
newfeature = asyncio.get_event_loop().run_until_complete(main(urls_and_coords))

(这种技术已经在 main 中使用了。 我不确定为什么会这样,因为 mainasync 的,所以你可以/应该在那里使用 await fetch_all(...)。)

(这项技术已经在main中使用。我不确定为什么,因为main是一个async函数,所以应该在那里使用await fetch_all(...)。)


3
但是我可能需要重写main函数,因为它已经有一个事件循环了? - Paal Pedersen
1
有趣的一点,我并不确定这是否会引起任何问题。但正如我所写的那样,在async函数内部使用run_until_complete毫无意义,你应该简单地使用await - deceze
1
它对我起作用。 请注意,如果您没有事件循环,因此出现错误:“RuntimeError:在线程'Thread-n'中没有当前事件循环”,则可以在函数中添加asyncio.set_event_loop(asyncio.new_event_loop())以设置事件循环。 - 1ronmat

17

还有一些库可以处理这个问题,并且总是做正确的事情。一个例子是 asgiref.sync,在这里描述了它的方法async_to_syncsync_to_async用于执行这些转换:

from asgiref.sync import async_to_sync

@async_to_sync
async def print_data():
    print(await get_data())

print_data()  # Can be called synchronously

来自asgiref.sync文档的更多信息:

AsyncToSync允许同步子线程停止和等待,同时在主线程的事件循环上调用异步函数,当异步函数完成时返回线程控制。

SyncToAsync使异步代码调用同步函数,在线程池中运行并在同步函数完成时将控制返回到异步协程。

还有其他类似的项目,如 koil


1
这个答案为什么点赞这么少呢?谢谢,它非常适合使用aiohttp编写包和初学者在Python中尝试异步特性。 - Soren V. Raben
9
因为将脆弱的第三方依赖添加到 Python 标准库已经轻松自带的功能中是个不好的想法 - 始终如此。只需直接调用 asyncio.run()asyncio.get_event_loop().run_until_complete() 即可。在任一情况下,都是一个简单的一行代码。asyncio 模块存在是有原因的。</facepalm> - Cecil Curry
4
这不是标准库以任何方式处理的简单事情,正如我在这里所描述的那样。正如周围所评论的那样,函数可以嵌套在其他线程、其他运行循环和同步和异步代码的调用层次结构中,并且当前线程可能已经有一个运行循环,也可能没有。需要一个能够处理和记录所有这些内容的函数。 - fuzzyTew

2

我能够在纯Python 3.10中使用内置的asyncio.run_coroutine_threadsafe使其正常工作。

这对我来说是新的,所以可能会有一些注意事项,例如由于异步方法实际上没有被等待,因此在回调完成之前进程可能()退出(除非您采取措施确保它不会)。

关于在哪里可能发生这种情况的参考,请参见bleak BLE库类BleakClient回调方法disconnected_callback。然后,在回调中尝试使用socket.io客户端的异步版本AsyncClient进行emit

简明的问题/解决方案:

import asyncio
from typing import Callable

Callback = Callable[[int], None]


class SomeSystem:
    """Some library you don't control that is mostly async, but provides a callback that
    is _not_ async."""

    def __init__(self, callback: Callback):
        self._callback = callback

    async def do_something(self):
        """do some work and then call the non-async callback"""
        await asyncio.sleep(1.0)
        self._callback(1)
        await asyncio.sleep(1.0)
        self._callback(2)


async def some_async_method(value: int):
    """some long-running operation normally called by async code"""
    await asyncio.sleep(0.1)
    print(f"long-running: {value}")


async def main():
    """main is async and started as normal with asyncio.run"""
    print("BEGIN main")

    loop = asyncio.get_running_loop()

    def cb(value: int) -> None:
        """This method _cannot_ be async, due to the underlying implementation of SomeSystem."""
        # some_async_method(value)  # RuntimeWarning: coroutine 'some_async_method' was never awaited
        asyncio.run_coroutine_threadsafe(some_async_method(value), loop)  # okay

    system = SomeSystem(cb)
    await system.do_something()

    # maybe ensure the last call to async method is awaited? Without this call, the final callback
    # won't be handled, since it's never being awaited. If anyone knows how to properly wait
    # for this, let me know in the comments!
    await asyncio.sleep(1.0)

    print("END main")


if __name__ == "__main__":
    asyncio.run(main())

输出

BEGIN main
long-running: 1
long-running: 2
END main

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接