Python的包允许我们使用和并行处理任务。
然而,为了调试,有时候临时用一个虚假的替代品取代真正的并行处理是有用的。这个替代品将在主线程中以串行方式执行任务,而不会生成任何线程或进程。
是否有实现一个的地方?
Python的包允许我们使用和并行处理任务。
然而,为了调试,有时候临时用一个虚假的替代品取代真正的并行处理是有用的。这个替代品将在主线程中以串行方式执行任务,而不会生成任何线程或进程。
是否有实现一个的地方?
像这样就可以:
from concurrent.futures import Future, Executor
from threading import Lock
class DummyExecutor(Executor):
def __init__(self):
self._shutdown = False
self._shutdownLock = Lock()
def submit(self, fn, *args, **kwargs):
with self._shutdownLock:
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
f = Future()
try:
result = fn(*args, **kwargs)
except BaseException as e:
f.set_exception(e)
else:
f.set_result(result)
return f
def shutdown(self, wait=True):
with self._shutdownLock:
self._shutdown = True
if __name__ == '__main__':
def fnc(err):
if err:
raise Exception("test")
else:
return "ok"
ex = DummyExecutor()
print(ex.submit(fnc, True))
print(ex.submit(fnc, False))
ex.shutdown()
ex.submit(fnc, True) # raises exception
在这种情况下可能不需要锁定,但使用锁定也不会有害。
class MockThreadPoolExecutor():
def __init__(self, **kwargs):
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
pass
def submit(self, fn, *args, **kwargs):
# execute functions in series without creating threads
# for easier unit testing
result = fn(*args, **kwargs)
return result
def shutdown(self, wait=True):
pass
if __name__ == "__main__":
def sum(a, b):
return a + b
with MockThreadPoolExecutor(max_workers=3) as executor:
future_result = list()
for i in range(5):
future_result.append(executor.submit(sum, i + 1, i + 2))
ThreadPoolExecutor.submit
返回的Future
对象执行某些操作。我的解决方案是添加一个名为MockFuture
的小类,其中包含一个result
方法,并使submit
返回该实例而不是直接返回结果。 - Jack Brounstein来自concurrent.futures
包的线程池是急切的(这当然是你想要的,意味着它们会尽快开始计算 - 在pool.submit()
调用和相关的future.result()
方法返回之间的某个时间点)。
从同步代码的角度来看,你有两个选择 - 要么在pool.submit()
调用时计算任务结果,要么在future.result()
检索时计算。
我发现第二种方法更自然,因为它更好地模拟了pool.map()
的“非阻塞”特性 - 结果可以在计算完成后一个接一个地获取(而不是等到所有结果都准备好)。
这是我的代码(需要额外实现DummyFuture
):
from concurrent.futures import Executor
from threading import Lock
from functools import partial
class DummyFuture():
def __init__(self, calculation) -> None:
self.calculation = calculation
def result(self, timeout=None):
return self.calculation()
def cancel(self):
pass
class DummyExecutor(Executor):
def __init__(self):
self._shutdown = False
self._shutdown_lock = Lock()
def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
return DummyFuture(partial(fn, *args, **kwargs))
def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
self._shutdown = True
Executor
直接调用可调用对象并返回Future
对象不应该太复杂。可以参考ThreadPoolExecutor
。 - mata