有没有一种方法可以从C++中调用一个异步的Python方法?

15

我们有一个基于Python的代码库,使用了asyncio和协程(async方法和await),我想要做的是从一个被拉入Python中的C++类中调用其中的一个方法(使用pybind11)。

假设有以下代码:

class Foo:
  async def bar(a, b, c):
    # some stuff
    return c * a

假设代码从Python被调用,并且有一个io循环处理它,那么在某个时间点,代码会跳转到C++领域,在这里需要调用bar方法 - 那么如何在C++中await这个方法的结果?


在重新阅读了您对已删除答案的评论后,我很好奇您要放置await的调用站点(实际上看起来像什么)。您想在C++中实现一个async def吗? - user4815162342
@user4815162342 - 这是正确的,在Python领域中,有async def方法,其中在某些点上有await ..用于其他异步操作。因此现在 - 与其有一个async Python方法,我有一个C++函数,并且想要在其中实现相同的效果(好吧,类似的东西)。 - Nim
4个回答

10

可以在C++中实现Python协程,但需要一些工作。您需要做的是将异步函数转换为状态机,这通常是解释器(在静态语言中是编译器)为您完成的工作。考虑一个非常简单的协程:

async def coro():
    x = foo()
    y = await bar()
    baz(x, y)
    return 42

调用 coro() 不会运行任何其代码,但它会生成一个可等待对象,该对象可以启动并多次恢复。 (但通常您看不到这些操作,因为它们由事件循环透明地执行。)可等待对象可以以两种不同的方式响应:通过 1)暂停或 2)指示已完成。

在协程内部,await 实现了暂停。如果使用生成器实现协程,则 y = await bar() 将转换为:

# pseudo-code for y = await bar()

_bar_iter = bar().__await__()
while True:
    try:
        _suspend_val = next(_bar_iter)
    except StopIteration as _stop:
        y = _stop.value
        break
    yield _suspend_val

换句话说,await会暂停(挂起),直到等待的对象完成。等待的对象通过引发StopIteration并将返回值藏在其value属性中来表明它已完成。如果在循环中使用yield听起来像yield from,那么你就是正确的,这就是为什么await经常被描述为yield from的原因。然而,在C++中我们没有yield(但还没有),所以我们必须将上述内容集成到状态机中。
要从头实现async def,我们需要有一个满足以下约束条件的类型:
- 构造时不做太多事情 - 通常只存储接收到的参数 - 具有返回可迭代对象的__await__方法,可以是self; - 具有返回迭代器的__iter__,可以再次是self; - 具有一个__next__方法,其调用实现状态机的一步,其中返回意味着挂起,引发StopIteration意味着完成。
上述协程在__next__中的状态机将包括三个状态:
1. 初始状态,当它调用foo()同步函数时 2. 下一个状态是等待bar()协程挂起的时间,传播挂起到调用者。一旦bar()返回值,我们就可以立即继续调用baz()并通过StopIteration异常返回值。 3. 最后一个状态只是引发异常,通知调用者协程已完成。
因此,上面展示的async def coro()定义可以看作是以下代码的语法糖:
class coro:
    def __init__(self):
        self._state = 0

    def __iter__(self):
        return self

    def __await__(self):
        return self

    def __next__(self):
        if self._state == 0:
            self._x = foo()
            self._bar_iter = bar().__await__()
            self._state = 1

        if self._state == 1:
            try:
                suspend_val = next(self._bar_iter)
                # propagate the suspended value to the caller
                # don't change _state, we will return here for
                # as long as bar() keeps suspending
                return suspend_val
            except StopIteration as stop:
                # we got our value
                y = stop.value
            # since we got the value, immediately proceed to
            # invoking `baz`
            baz(self._x, y)
            self._state = 2
            # tell the caller that we're done and inform
            # it of the return value
            raise StopIteration(42)

        # the final state only serves to disable accidental
        # resumption of a finished coroutine
        raise RuntimeError("cannot reuse already awaited coroutine")

我们可以使用真正的 asyncio 来测试我们的“协程”是否可行:
>>> class coro:
... (definition from above)
...
>>> def foo():
...     print('foo')
...     return 20
... 
>>> async def bar():
...     print('bar')
...     return 10
... 
>>> def baz(x, y):
...     print(x, y)
... 
>>> asyncio.run(coro())
foo
bar
20 10
42

剩下的部分是在Python/C或pybind11中编写coro类。

这是一个很好的解决方案,我会尝试并回复!谢谢。 - Nim
@Nim 谢谢。这个老答案中还有一些附加细节,不过里面的代码太依赖asyncio了,在你的用例中其实并不需要(虽然如果需要还是可以做到的)。我认为这个答案更好地捕捉了核心思想。 - user4815162342

5
这不是pybind11,但你可以直接从C语言调用异步函数。只需使用add_done_callback将回调添加到future即可。我假设pybind11允许你调用Python函数,所以步骤应该是相同的。
请参考以下链接:https://github.com/MarkReedZ/mrhttp/blob/master/src/mrhttp/internals/protocol.c
result = protocol_callPageHandler(self, r->func, request))

现在异步函数的结果是一个future。就像在Python中一样,您需要使用生成的future调用create_task:
PyObject *task;
if(!(task = PyObject_CallFunctionObjArgs(self->create_task, result, NULL))) return NULL;

接下来,您需要使用add_done_callback添加回调函数:

add_done_callback = PyObject_GetAttrString(task, "add_done_callback")
PyObject_CallFunctionObjArgs(add_done_callback, self->task_done, NULL)

self->task_done是在Python中注册的一个C函数,当任务完成时会被调用。


0

单次回调

首先,从pybind11中获取一个基本的回调函数。以下是一些参考资料:

下一步是让一个 asyncio 的 Future POC / 示例正常工作。请参考: 现在你已经弄清楚了这两个核心机制,把它们结合起来吧!
关键的“技巧”是从Python中调用一个名为“setCallback”的pybind函数,并将你的future对象的“set_result”函数传递给它。这样,你就可以在C++中得到一个回调,进而输入到asyncio Future中!
然而,如果你在C++中使用多线程,可能会遇到一个问题...未来的值可能已经设置好了,但是asyncio事件循环可能没有响应。为了解决这个问题,我建议添加另一个回调来“唤醒”循环。参见:https://dev59.com/Hrj4oIgBc1ULPQZFITUU#76724344 未经测试的示例代码片段 C++
#include <string>
#include <thread>

#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <pybind11/functional.h>
namespace py = pybind11;

typedef std::function<void(const std::string &)> MessageHandler;
typedef std::function<void()> WakeHandler;

class MyHelper
{
public:
    MyHelper(){}
    ~MyHelper(){}

    void setMessageHandler( const MessageHandler handler )
    { messageHandler_ = handler; }

    void setWakeHandler( const WakeHandler handler )
    { wakeHandler_ = handler; }

    void test()
    {   
        std::thread t([this](){
            std::this_thread::sleep_for( std::chrono::seconds( 3 ) );
            if( messageHandler_ ) messageHandler_( "Hello from C++!" );
            if( wakeHandler_ ) wakeHandler_();
        });
        t.detach();    
    }

    static void pybindInit( py::module &m )
    {
        py::class_<MyHelper>( m, "MyHelper" )
            .def( py::init<>() )
            .def( "setMessageHandler", &MyClass::setMessageHandler,
                  py::call_guard<py::gil_scoped_release>() )
            .def( "setWakeHandler"   , &MyClass::setWakeHandler,
                  py::call_guard<py::gil_scoped_release>() )
            .def( "test"             , &MyClass::test,
                  py::call_guard<py::gil_scoped_release>() )
        ;
    }

private:
    MessageHandler messageHandler_;
    WakeHandler wakeHandler_;
};

Python:
import MyHelper

async def main( helper ):
    global event_loop
    event_loop = asyncio.get_running_loop()
    helper.test()
    await asyncio.gather( get_help( helper ), other_coro() )
    print( "Success!" )

def wake():
    asyncio.run_coroutine_threadsafe( asyncio.sleep( 0 ), event_loop )

async def get_help( helper ):        
    future = event_loop.create_future()
    helper.setMessageHandler( future.set_result )
    helper.setWakeHandler( wake )
    print( await future )

async def other_coro():
    for i in range(5):
        await asyncio.sleep( 1 )
        print( "some other python work...." )

if __name__ == "__main__": asyncio.run( main( MyHelper() ) )

重复的回调函数

好的... 所以这可能不是"全部",具体取决于您的需求... 一个 Future 的 set_result 只能触发一次。:( 此外,您可能希望从 C++ 中"取消"它或返回异常...

如果您的目标是有时调用 C++ "寻求帮助" 并异步获取结果,我所描述/演示的应该可以工作。但如果您希望 C++ 重复发送 "事件" 或异步 "消息" 到您的 Python 脚本中,您需要在双方都进行更多的工作来扩展这个简单的设计。简而言之,在 Python 方面,您需要不断创建新的 future 对象并将它们传递给 C++ 用于每个回调。您还需要确保在 C++ 方面仅使用每个回调一次,并可选择在该端阻塞,直到新的回调被分配且在 Python 端"准备就绪"。


-2

对于这样的事情,如果我不想深入研究CPython API,我只是用Python编写我的代码,并使用pybind的Python接口调用它。

一个例子: https://github.com/RobotLocomotion/drake/blob/a7700d3/bindings/pydrake/init.py#L44 https://github.com/RobotLocomotion/drake/blob/a7700d3/bindings/pydrake/pydrake_pybind.h#L359

针对这种情况,也许你可以这样做:

# cpp_helpers.py
def await_(obj):
    return await obj

py::object py_await = py::module::import("cpp_helpers").attr("await_");
auto result = py::cast<MyResult>(py_await(py_obj));

然而,这种方法很可能比上述解决方案的性能要差。

3
非异步函数中无法使用async await。 - pwuertz

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接