Python的`futures`模块中的DummyExecutor

25

Python的包允许我们使用和并行处理任务。

然而,为了调试,有时候临时用一个虚假的替代品取代真正的并行处理是有用的。这个替代品将在主线程中以串行方式执行任务,而不会生成任何线程或进程。

是否有实现一个的地方?


@mata 我不这么认为,那会创建一个线程,但它仍然会与主线程分离。 - Ram Rachum
当然,你是正确的。但是实现一个Executor直接调用可调用对象并返回Future对象不应该太复杂。可以参考ThreadPoolExecutor - mata
1
在你开始做之前,它总是看起来很简单,但在你完成之后并不总是如此。如果已经有人实现了这个功能,最好我使用他们准备好的实现。 - Ram Rachum
3个回答

24

像这样就可以:

from concurrent.futures import Future, Executor
from threading import Lock


class DummyExecutor(Executor):

    def __init__(self):
        self._shutdown = False
        self._shutdownLock = Lock()

    def submit(self, fn, *args, **kwargs):
        with self._shutdownLock:
            if self._shutdown:
                raise RuntimeError('cannot schedule new futures after shutdown')

            f = Future()
            try:
                result = fn(*args, **kwargs)
            except BaseException as e:
                f.set_exception(e)
            else:
                f.set_result(result)

            return f

    def shutdown(self, wait=True):
        with self._shutdownLock:
            self._shutdown = True


if __name__ == '__main__':

    def fnc(err):
        if err:
            raise Exception("test")
        else:
            return "ok"

    ex = DummyExecutor()
    print(ex.submit(fnc, True))
    print(ex.submit(fnc, False))
    ex.shutdown()
    ex.submit(fnc, True) # raises exception

在这种情况下可能不需要锁定,但使用锁定也不会有害。


4
使用此方法来模拟您的ThreadPoolExecutor。
class MockThreadPoolExecutor():
    def __init__(self, **kwargs):
        pass

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
        pass

    def submit(self, fn, *args, **kwargs):
        # execute functions in series without creating threads
        # for easier unit testing
        result = fn(*args, **kwargs)
        return result

    def shutdown(self, wait=True):
        pass

if __name__ == "__main__":
    def sum(a, b):
        return a + b

    with MockThreadPoolExecutor(max_workers=3) as executor:
        future_result = list()
        for i in range(5):
            future_result.append(executor.submit(sum, i + 1, i + 2))

这是一个不错的解决方案,但我正在使用它来测试代码,然后对由ThreadPoolExecutor.submit返回的Future对象执行某些操作。我的解决方案是添加一个名为MockFuture的小类,其中包含一个result方法,并使submit返回该实例而不是直接返回结果。 - Jack Brounstein

0

来自concurrent.futures包的线程池是急切的(这当然是你想要的,意味着它们会尽快开始计算 - 在pool.submit()调用和相关的future.result()方法返回之间的某个时间点)。

从同步代码的角度来看,你有两个选择 - 要么在pool.submit()调用时计算任务结果,要么在future.result()检索时计算。

我发现第二种方法更自然,因为它更好地模拟了pool.map()的“非阻塞”特性 - 结果可以在计算完成后一个接一个地获取(而不是等到所有结果都准备好)。

这是我的代码(需要额外实现DummyFuture):

from concurrent.futures import Executor
from threading import Lock
from functools import partial


class DummyFuture():
    def __init__(self, calculation) -> None:
        self.calculation = calculation
    def result(self, timeout=None):
        return self.calculation()
    def cancel(self):
        pass


class DummyExecutor(Executor):

    def __init__(self):
        self._shutdown = False
        self._shutdown_lock = Lock()

    def submit(self, fn, /, *args, **kwargs):
        with self._shutdown_lock:
            if self._shutdown:
                raise RuntimeError('cannot schedule new futures after shutdown')

            return DummyFuture(partial(fn, *args, **kwargs))

    def shutdown(self, wait=True, *, cancel_futures=False):
        with self._shutdown_lock:
            self._shutdown = True


(我必须要感谢mata的回答,因为它是我实现的最初起点。)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接