将作业提交到asyncio事件循环

10

我想从一个线程向 asyncio 事件循环提交作业(就像 run_in_executor 但是反过来)。

这是关于 asyncio 的文档对于 并发和多线程 的解释:

要从不同的线程中调度回调,应该使用BaseEventLoop.call_soon_threadsafe()方法。 下面是将协程函数从另一个线程中调度的示例: loop.call_soon_threadsafe(asyncio.async, coro_func())

这个方法可以正常运行,但是协程的结果会丢失。

相反,可以使用一个函数,将一个回调添加到由 async (或 ensure_future) 返回的 future 中,以便线程可以通过concurrent.futures.Future 访问结果。

是否有特殊原因未在标准库中实现此功能?还是我错过了更简单的实现方式?

1个回答

12

我的请求得到了响应,run_coroutine_threadsafe 函数已在此处实现。

示例:

def target(loop, timeout=None):
    future = asyncio.run_coroutine_threadsafe(add(1, b=2), loop)
    return future.result(timeout)

async def add(a, b):
    await asyncio.sleep(1)
    return a + b

loop = asyncio.get_event_loop()
future = loop.run_in_executor(None, target, loop)
assert loop.run_until_complete(future) == 3

我最初发布了一个concurrent.futures.Executor的子类,它仍然可以被实现为:

class LoopExecutor(concurrent.futures.Executor):
    """An Executor subclass that uses an event loop 
    to execute calls asynchronously."""

    def __init__(self, loop=None):
        """Initialize the executor with a given loop."""
        self.loop = loop or asyncio.get_event_loop()

    def submit(self, fn, *args, **kwargs):
        """Schedule the callable, fn, to be executed as fn(*args **kwargs).
        Return a Future object representing the execution of the callable."""
        coro = asyncio.coroutine(fn)(*args, **kwargs)
        return asyncio.run_coroutine_threadsafe(coro, self.loop)

你想把这个放在问题里面,这样就不会像是一个回答了吗? - Lawrence Benson
1
这其实是我自己问题的部分答案,因为可能有更好的方法来实现同样的事情。 - Vincent
如果你这样看的话,没问题 :) - Lawrence Benson

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接