Python - 如何将C函数实现为可等待的(协程)

24

环境:协作的C语言RTOS和micropython虚拟机是任务之一。

为了使虚拟机不阻塞其他RTOS任务,我在vm.c:DISPATCH()中插入RTOS_sleep(),这样每次执行完字节码后,虚拟机就会放弃控制权并转到下一个RTOS任务。

我创建了一个uPy接口,使用生产者-消费者设计模式异步从物理数据总线(可以是CAN、SPI、以太网)获取数据。

在uPy中的用法:

can_q = CANbus.queue()
message = can_q.get()

在C语言中的实现是这样的:can_q.get()不会阻塞RTOS:它会轮询一个C队列,如果没有接收到消息,就会调用RTOS_sleep()来让另一个任务有机会填充队列。这是同步的,因为C队列只能被另一个RTOS任务更新,并且RTOS任务只在调用RTOS_sleep()时切换,即协作式

C语言的实现基本上是:

// gives chance for c-queue to be filled by other RTOS task
while(c_queue_empty() == true) RTOS_sleep(); 
return c_queue_get_message();

虽然Python语句can_q.get()不会阻塞RTOS,但它会阻塞uPy脚本。

我想重写它,使其能够与async def协程一起使用,并且不会阻塞uPy脚本。

不确定语法,但类似于以下内容:

can_q = CANbus.queue()
message = await can_q.get()

问题

我该如何编写一个C函数,以便可以在其上使用await?

我更倾向于得到CPython和MicroPython的答案,但是我也可以接受仅针对CPython的答案。

1个回答

34
注意:此答案涵盖了CPython和asyncio框架。但是,这些概念应该适用于其他Python实现以及其他异步框架。
如何编写C函数以便可以await它?
编写一个C函数使其结果可await的最简单方法是让它返回一个已经创建好的可await对象,例如asyncio.Future。在返回Future之前,代码必须安排通过某些异步机制设置future的结果。所有这些基于协程的方法都假定您的程序正在运行某个事件循环下,该事件循环知道如何调度协程。
但仅返回一个future并不总是足够的-也许我们想定义一个具有任意数量挂起点的对象。返回future仅暂停一次(如果返回的future尚未完成),在future完成后恢复一次,就这样。与包含多个await的async def等效的可await对象不能通过返回future来实现,它必须实现协程通常实现的协议。这有点像迭代器实现自定义__next__并可用作生成器的替代品。
定义自定义可await对象
为了定义我们自己的可等待类型,我们可以参考PEP 492,该指定可以传递给await的对象。除了使用async def定义的Python函数外,用户定义的类型可以通过定义__await__特殊方法使对象成为可等待的,Python/C将其映射到PyTypeObject结构的tp_as_async.am_await部分。这意味着在Python/C中,您必须执行以下操作:
  • 为您的扩展类型的tp_as_async字段指定一个非NULL值。
  • 使其am_await成员指向一个C函数,该函数接受您类型的实例并返回实现迭代器协议的另一个扩展类型的实例,即定义tp_iter(简单定义为PyIter_Self)和tp_iternext
  • 迭代器的tp_iternext必须推进协程的状态机。每个非异常返回从tp_iternext对应于挂起,并且最终的StopIteration异常表示协程的最终返回。返回值存储在StopIterationvalue属性中。
为了使协程有用,它还必须能够与驱动它的事件循环进行通信,以便在暂停后指定何时恢复。大多数由asyncio定义的协程都希望在asyncio事件循环下运行,并在内部使用asyncio.get_event_loop()(和/或接受显式的loop参数)来获取其服务。
示例协程
为了说明Python/C代码需要实现什么,让我们考虑一个简单的协程,表示为Python的async def,例如asyncio.sleep()的等效函数。{{}}
async def my_sleep(n):
    loop = asyncio.get_event_loop()
    future = loop.create_future()
    loop.call_later(n, future.set_result, None)
    await future
    # we get back here after the timeout has elapsed, and
    # immediately return

my_sleep创建一个Future,安排它在n秒后完成(其结果变为设置),并在未来完成之前暂停自身。最后一部分使用await,其中await x的意思是“允许x决定我们现在是挂起还是继续执行”。不完整的future总是决定挂起,并且asyncio Task协程驱动程序特别处理已提交的futures以无限期地挂起它们并连接它们的完成以恢复任务。其他事件循环的挂起机制(如curio等)可能在细节上有所不同,但基本思想是相同的: await是可选的执行挂起。

__await__()返回生成器

要将此转换为C,我们必须摆脱魔术async def函数定义,以及await挂起点。删除async def相当简单:等效的普通函数只需要返回实现__await__的对象:

def my_sleep(n):
    return _MySleep(n)

class _MySleep:
    def __init__(self, n):
        self.n = n

    def __await__(self):
        return _MySleepIter(self.n)

my_sleep()返回的_MySleep对象的__await__方法将自动被await运算符调用,以将可等待对象(传递给await的任何内容)转换为迭代器。该迭代器将用于询问等待的对象是否选择挂起或提供值。这很像for o in x语句调用x.__iter __()将可迭代的x转换为具体的iterator
当返回的迭代器选择挂起时,它只需要产生一个值。如果有任何意义,该值的含义将由协程驱动程序解释,通常是事件循环的一部分。当迭代器选择停止执行并从await返回时,它需要停止迭代。使用生成器作为方便的迭代器实现,_MySleepIter将如下所示:
def _MySleepIter(n):
    loop = asyncio.get_event_loop()
    future = loop.create_future()
    loop.call_later(n, future.set_result, None)
    # yield from future.__await__()
    for x in future.__await__():
        yield x

作为 await x 对应 yield from x.__await__(),我们的生成器必须耗尽由 future.__await__() 返回的迭代器。由 Future.__await__ 返回的迭代器将在未完成时产生 yield,并返回 future 的结果(虽然我们在这里忽略了,但实际上 yield from 提供了)。

__await__() 返回自定义迭代器

在 C 中实现 my_sleep 的最后一个障碍是使用 generator 用于 _MySleepIter。幸运的是,任何生成器都可以被转换为一个有状态的迭代器,其 __next__ 执行代码片段直到下一个 await 或 return。 __next__ 实现了生成器代码的状态机版本,其中 yield 通过返回值表示,return 通过引发 StopIteration 表示。例如:

class _MySleepIter:
    def __init__(self, n):
        self.n = n
        self.state = 0

    def __iter__(self):  # an iterator has to define __iter__
        return self

    def __next__(self):
        if self.state == 0:
            loop = asyncio.get_event_loop()
            self.future = loop.create_future()
            loop.call_later(self.n, self.future.set_result, None)
            self.state = 1
        if self.state == 1:
            if not self.future.done():
                return next(iter(self.future))
            self.state = 2
        if self.state == 2:
            raise StopIteration
        raise AssertionError("invalid state")

翻译为C语言

上述内容需要输入相当多的字符,但它可以工作,并且只使用可以通过本机Python/C函数定义的结构。

实际上,将这两个类翻译为C语言非常简单,但超出了本答案的范围。


1
Python是否提供像node api的napi_queue_async_work这样的东西,用于与C语言中的事件循环进行交互? - Alex Rothberg

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接