注意:此答案涵盖了CPython和asyncio框架。但是,这些概念应该适用于其他Python实现以及其他异步框架。
如何编写C函数以便可以await它?
编写一个C函数使其结果可await的最简单方法是让它返回一个已经创建好的可await对象,例如
asyncio.Future
。在返回Future之前,代码必须安排通过某些异步机制设置future的结果。所有这些基于协程的方法都假定您的程序正在运行某个事件循环下,该事件循环知道如何调度协程。
但仅返回一个future并不总是足够的-也许我们想定义一个具有任意数量挂起点的对象。返回future仅暂停一次(如果返回的future尚未完成),在future完成后恢复一次,就这样。与包含多个await的async def等效的可await对象不能通过返回future来实现,它必须实现协程通常实现的协议。这有点像迭代器实现自定义__next__并可用作生成器的替代品。
定义自定义可await对象
为了定义我们自己的可等待类型,我们可以参考PEP 492,该
指定可以传递给
await
的对象。除了使用
async def
定义的Python函数外,用户定义的类型可以通过定义
__await__
特殊方法使对象成为可等待的,Python/C将其映射到
PyTypeObject
结构的
tp_as_async.am_await
部分。这意味着在Python/C中,您必须执行以下操作:
- 为您的扩展类型的
tp_as_async
字段指定一个非NULL值。
- 使其
am_await
成员指向一个C函数,该函数接受您类型的实例并返回实现迭代器协议的另一个扩展类型的实例,即定义tp_iter
(简单定义为PyIter_Self
)和tp_iternext
。
- 迭代器的
tp_iternext
必须推进协程的状态机。每个非异常返回从tp_iternext
对应于挂起,并且最终的StopIteration
异常表示协程的最终返回。返回值存储在StopIteration
的value
属性中。
为了使协程有用,它还必须能够与驱动它的事件循环进行通信,以便在暂停后指定何时恢复。大多数由asyncio定义的协程都希望在asyncio事件循环下运行,并在内部使用
asyncio.get_event_loop()
(和/或接受显式的
loop
参数)来获取其服务。
示例协程
为了说明Python/C代码需要实现什么,让我们考虑一个简单的协程,表示为Python的
async def
,例如
asyncio.sleep()
的等效函数。{{}}
async def my_sleep(n):
loop = asyncio.get_event_loop()
future = loop.create_future()
loop.call_later(n, future.set_result, None)
await future
my_sleep
创建一个Future
,安排它在n秒后完成(其结果变为设置),并在未来完成之前暂停自身。最后一部分使用await
,其中await x
的意思是“允许x
决定我们现在是挂起还是继续执行”。不完整的future总是决定挂起,并且asyncio Task
协程驱动程序特别处理已提交的futures以无限期地挂起它们并连接它们的完成以恢复任务。其他事件循环的挂起机制(如curio等)可能在细节上有所不同,但基本思想是相同的: await
是可选的执行挂起。
__await__()
返回生成器
要将此转换为C,我们必须摆脱魔术async def
函数定义,以及await
挂起点。删除async def
相当简单:等效的普通函数只需要返回实现__await__
的对象:
def my_sleep(n):
return _MySleep(n)
class _MySleep:
def __init__(self, n):
self.n = n
def __await__(self):
return _MySleepIter(self.n)
由
my_sleep()
返回的
_MySleep
对象的
__await__
方法将自动被
await
运算符调用,以将可等待对象(传递给
await
的任何内容)转换为迭代器。该迭代器将用于询问等待的对象是否选择挂起或提供值。这很像
for o in x
语句调用
x.__iter __()
将可迭代的
x
转换为具体的
iterator。
当返回的迭代器选择挂起时,它只需要产生一个值。如果有任何意义,该值的含义将由协程驱动程序解释,通常是事件循环的一部分。当迭代器选择停止执行并从
await
返回时,它需要停止迭代。使用生成器作为方便的迭代器实现,
_MySleepIter
将如下所示:
def _MySleepIter(n):
loop = asyncio.get_event_loop()
future = loop.create_future()
loop.call_later(n, future.set_result, None)
for x in future.__await__():
yield x
作为 await x
对应 yield from x.__await__()
,我们的生成器必须耗尽由 future.__await__()
返回的迭代器。由 Future.__await__
返回的迭代器将在未完成时产生 yield,并返回 future 的结果(虽然我们在这里忽略了,但实际上 yield from
提供了)。
__await__()
返回自定义迭代器
在 C 中实现 my_sleep
的最后一个障碍是使用 generator 用于 _MySleepIter
。幸运的是,任何生成器都可以被转换为一个有状态的迭代器,其 __next__
执行代码片段直到下一个 await 或 return。 __next__
实现了生成器代码的状态机版本,其中 yield
通过返回值表示,return
通过引发 StopIteration
表示。例如:
class _MySleepIter:
def __init__(self, n):
self.n = n
self.state = 0
def __iter__(self):
return self
def __next__(self):
if self.state == 0:
loop = asyncio.get_event_loop()
self.future = loop.create_future()
loop.call_later(self.n, self.future.set_result, None)
self.state = 1
if self.state == 1:
if not self.future.done():
return next(iter(self.future))
self.state = 2
if self.state == 2:
raise StopIteration
raise AssertionError("invalid state")
翻译为C语言
上述内容需要输入相当多的字符,但它可以工作,并且只使用可以通过本机Python/C函数定义的结构。
实际上,将这两个类翻译为C语言非常简单,但超出了本答案的范围。
napi_queue_async_work
这样的东西,用于与C语言中的事件循环进行交互? - Alex Rothberg