能将concurrent.futures.Future转换为asyncio.Future吗?

15

我写了多年的多线程代码后,正在练习asyncio。注意到一些我觉得奇怪的事情,就是在asyncioconcurrent中都有一个Future对象。

from asyncio import Future
from concurrent.futures import Future

每个都有自己的角色...

我的问题是,我能否将concurrent.future.Future转换为asyncio.Future(或反之亦然)?


不确定是否完全重复,但肯定有关联:https://dev59.com/0l0a5IYBdhLWcg3w07kt - mkrieger1
谢谢。我已经阅读了,那里有一个比较,但对我来说还没有答案。 - Aaron_ab
mkrieger1的帖子很有帮助。我不确定你所说的转移到其他是什么意思。 - Kevin He
3个回答

16
我的问题是是否可以将 concurrent.future.Future 转换为 asyncio.Future(或相反)?
如果您所说的“转换”是指将一个转换成另一个,那么是可以的,尽管桥接阻抗不匹配可能需要一些工作。

要将concurrent.futures.Future转换为asyncio.Future,您可以调用asyncio.wrap_future。返回的asyncio future在asyncio事件循环中是可等待的,在底层线程future完成时将完成。这实际上就是run_in_executor实现方式

目前没有公开的功能可以直接将一个 asyncio future 转换为 concurrent.futures future,但是有一个 asyncio.run_coroutine_threadsafe 函数,它接受一个 协程,将其提交到事件循环中,并返回一个 concurrent future,当 asyncio future 完成时,该 future 也会完成。这可以用于有效地将任何 asyncio-awaitable future 转换为 concurrent future,例如:

def to_concurrent(fut, loop):
    async def wait():
        await fut
    return asyncio.run_coroutine_threadsafe(wait(), loop)

返回的future会像并发future一样运行,例如它的result()方法将会阻塞等待结果。需要注意的是,使用add_done_callback添加到并发future中的回调函数将在标记该future完成的线程中运行,这里是事件循环线程。这意味着如果您添加了一些回调函数,您需要小心不要在其实现中调用阻塞调用,以免阻塞事件循环。 请注意,调用run_coroutine_threadsafe需要事件循环在其他线程中实际运行。(例如,您可以启动一个后台线程并让它执行loop.run_forever。)

2

2

针对“从concurrent future到asyncio future”的部分,这里有一个我使用的实用工具。

from typing import List, Any
from concurrent.futures.thread import ThreadPoolExecutor
import asyncio


class AsyncThreadPool(ThreadPoolExecutor):
    _futures: List[asyncio.Future]
    _loop: asyncio.AbstractEventLoop

    def __init__(self, max_workers=None):
        super().__init__(max_workers)
        self._futures = []

    def queue(self, fn):
        self._loop = asyncio.get_event_loop()
        fut = self._loop.create_future()
        self._futures.append(fut)
        self.submit(self._entry, fn, fut)

    def queueAsync(self, coroutine):
        def newLoop():
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            return loop.run_until_complete(coroutine)
        self.queue(newLoop)

    def _entry(self, fn, fut: asyncio.Future):
        try:
            result = fn()
            self._loop.call_soon_threadsafe(fut.set_result, result)
        except Exception as e:
            self._loop.call_soon_threadsafe(fut.set_exception, e)

    async def gather(self) -> List[Any]:
        return await asyncio.gather(*self._futures)

您可以像这样使用它:
with AsyncThreadPool() as pool:
    # Queue some sync function (will be executed on another thread)
    pool.queue(someHeavySyncFunction)
    # Queue a coroutine that will be executed on a new event loop running on another thread
    pool.queue(otherAsyncFunction())

    # Gather results (non blocking for your current loop)
    res: List[Any] = await pool.gather()

由于AsyncPool已经在事件循环内运行,因此queueAsync直接调用self._futures.append(asyncio.create_task(coroutine))是否更有效率?在asyncio中创建多个事件循环有点反模式。 - user4815162342

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接