将带有回调函数的Python函数转换为asyncio可等待对象

21

我希望在异步环境中使用PyAudio库,但该库的主要入口只有基于回调的API:

import pyaudio

def callback(in_data, frame_count, time_info, status):
    # Do something with data

pa = pyaudio.PyAudio()
self.stream = self.pa.open(
    stream_callback=callback
)

我的希望是能够像这样使用它:

pa = SOME_ASYNC_COROUTINE()
async def listen():
    async for block in pa:
        # Do something with block

问题是,我不确定如何将这个回调语法转换为在回调触发时完成的future。在JavaScript中,我会使用promise.promisify(),但Python似乎没有类似的东西。

2个回答

21

promisify的等效物在这种情况下不起作用,原因有两个:

  • PyAudio的异步API不使用asyncio事件循环 - 文档指定回调从后台线程调用。这需要采取预防措施来正确地与asyncio通信。
  • 回调无法通过单个future建模,因为它被调用多次,而future只能有一个结果。相反,它必须转换为异步迭代器,就像您示例代码中所示的那样。

以下是一种可能的实现方式:

def make_iter():
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue()
    def put(*args):
        loop.call_soon_threadsafe(queue.put_nowait, args)
    async def get():
        while True:
            yield await queue.get()
    return get(), put

make_iter 返回一个<异步迭代器,放置回调函数>的二元组。返回的对象具有这样的属性:调用回调函数会导致迭代器生成其下一个值(传递给回调函数的参数)。回调函数可以从任意线程调用,因此安全地传递给 pyaudio.open,而异步迭代器应该在 asyncio 协程中使用 async for,在等待下一个值时会被挂起:

async def main():
    stream_get, stream_put = make_iter()
    stream = pa.open(stream_callback=stream_put)
    stream.start_stream()
    async for in_data, frame_count, time_info, status in stream_get:
        # ...

asyncio.get_event_loop().run_until_complete(main())

请注意,根据文档,回调函数还必须返回一个有意义的值,即数据帧和布尔标志的元组。这可以通过改变fill函数以从异步IO端接收数据来实现。由于没有理解该领域可能会让实现毫无意义,因此本示例未包含在内。


谢谢,这非常有帮助!不过,可能会更清晰一些的是,将您的示例make_iter()改用类来实现,因为我最初很难理解它是一个返回函数元组的函数。 - Migwell
2
@Miguel 因为回调函数将在由PyAudio管理的后台线程中调用,而不是事件循环线程中调用。call_soon_threadsafe就是为这种情况设计的。它会在不破坏事件循环的情况下(例如,在没有持有正确锁的情况下破坏其数据结构)将函数安排到事件循环中,并在事件循环在此期间处于休眠状态时唤醒它。 - user4815162342
1
事件循环线程也在操作队列,因为事件循环从队列中删除内容(并且自己使用call_soon来满足自身需求)。但即使没有破坏风险,如果不使用线程安全变量,事件循环也不会被唤醒,因为它不知道需要唤醒。典型的症状是存在一个无关的心跳协程可以“修复”问题,例如这个问题 - user4815162342
1
哦,它唤醒了事件循环!这就解释了为什么我在删除“call_soon_threadsafe”时,我的测试会永远挂起。谢谢! - Migwell
1
根据这个答案,我为sounddevice模块创建了一个示例:https://github.com/spatialaudio/python-sounddevice/blob/master/examples/asyncio_generators.py。这似乎运行得非常好! - Matthias
显示剩余5条评论

4
您可以使用Future

class asyncio.Future(*, loop=None)¶

Future表示异步操作的最终结果。不是线程安全的。

Future是一个可等待对象。协程可以等待Future对象,直到它们有了结果或异常设置,或者直到它们被取消。

通常情况下,Futures用于使低级别的基于回调的代码(例如使用asyncio传输实现的协议)与高级别的async/await代码进行交互。

经验法则是永远不要在面向用户的API中公开Future对象,并且创建Future对象的推荐方法是调用loop.create_future()。这样,替代事件循环实现可以注入其自己优化的Future对象实现。

一个愚蠢的例子:

def my_func(loop):
    fut = loop.create_future()
    pa.open(
        stream_callback=lambda *a, **kw: fut.set_result([a, kw])
    )
    return fut


async def main(loop):
    result = await my_func(loop)  # returns a list with args and kwargs 

我假设pa.open在一个线程或子进程中运行。如果不是这样,您可能还需要使用asyncio.loop.run_in_executor来包装对open的调用。


如果回调被多次调用,每次都有一块数据怎么办?似乎只能调用一次set_result。 - ospider

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接