Python ThreadPoolExecutor - 回调函数是否保证在提交的函数所在的线程中运行?

18

在线程池执行器(TPE)中,回调函数是否总是保证在与提交的函数相同的线程中运行?

例如,我使用以下代码进行了测试。我多次运行它,似乎funccallback总是在同一个线程中运行。

import concurrent.futures 
import random 
import threading 
import time 

executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) 

def func(x): 
    time.sleep(random.random()) 
    return threading.current_thread().name 

def callback(future): 
    time.sleep(random.random()) 
    x = future.result() 
    cur_thread = threading.current_thread().name 
    if (cur_thread != x): 
        print(cur_thread, x) 

print('main thread: %s' % threading.current_thread()) 
for i in range(10000): 
    future = executor.submit(func, i) 
    future.add_done_callback(callback) 

然而,当我移除了 time.sleep(random.random()) 语句时,似乎它出现了故障,即至少有一些 func 函数和 callbacks 没有 在同一个线程中运行。

对于我正在工作的项目,回调必须始终在提交的函数所在的同一线程上运行,因此我希望确保 TPE 可以保证这一点。(同时,没有随机睡眠的测试结果似乎让人困惑)。

我查看了执行器的源代码,发现在运行回调之前,我们似乎没有将线程切换到主线程。但是我只想确认一下。

1个回答

15

文档没有保证回调函数在哪个线程中运行。唯一的已记录保证是回调函数将在添加回调函数的进程所属的线程中运行,但这可能是任何线程,因为您正在使用ThreadPoolExecutor而不是ProcessPoolExecutor:

已添加的可调用项按添加顺序调用,并始终在添加它们的进程所属的线程中调用。


在当前的ThreadPoolExecutor实现中,回调函数执行的线程取决于添加回调时Future的状态以及Future是否被取消。这些是实现细节;您不应该依赖它们,因为它们可能在不同的Python实现或不同版本中有所不同,并且它们可能会在没有通知的情况下发生更改。
如果在Future完成后添加回调,则回调将在您调用add_done_callback的任何线程中执行。您可以通过查看add_done_callback源代码来了解这一点:
def add_done_callback(self, fn):
    """Attaches a callable that will be called when the future finishes.

    Args:
        fn: A callable that will be called with this future as its only
            argument when the future completes or is cancelled. The callable
            will always be called by a thread in the same process in which
            it was added. If the future has already completed or been
            cancelled then the callable will be called immediately. These
            callables are called in the order that they were added.
    """
    with self._condition:
        if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
            self._done_callbacks.append(fn)
            return
    fn(self)

如果Future的状态表明它已被取消或已完成,则fn会立即在当前执行线程中调用。否则,它将被添加到内部回调列表中,在Future完成时运行。
例如:
>>> def func(*args):
...  time.sleep(5)
...  print("func {}".format(threading.current_thread()))
>>> def cb(a): print("cb {}".format(threading.current_thread()))
... 
>>> fut = ex.submit(func)
>>> func <Thread(Thread-1, started daemon 140084551563008)>
>>> fut = e.add_done_callback(cb)
cb <_MainThread(MainThread, started 140084622018368)>

如果通过成功的cancel调用取消了一个未来,那么执行取消操作的线程将立即调用所有回调函数:
def cancel(self):
    """Cancel the future if possible.
    Returns True if the future was cancelled, False otherwise. A future
    cannot be cancelled if it is running or has already completed.
    """
    with self._condition:
        if self._state in [RUNNING, FINISHED]:
            return False

        if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
            return True

        self._state = CANCELLED
        self._condition.notify_all()

    self._invoke_callbacks()
    return True

否则,回调函数将由执行 future 任务的线程调用。

2
那么ProcessPoolExecutor呢?回调函数的get_ident也是不同的。 - Winand

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接