Python:concurrent.futures如何使其可取消?

27

Python concurrent.futures和ProcessPoolExecutor提供了一个简洁的接口来安排和监控任务。Futures甚至提供了一个.cancel()方法:provide

cancel(): 尝试取消调用。如果调用正在执行且无法取消,则该方法将返回False,否则调用将被取消,并且方法将返回True。

不幸的是,在类似question(关于asyncio的问题)中,答案声称使用文档片段运行的任务无法被取消,但文档并未说明这一点,只有在它们正在运行且无法取消时才不能取消。

向进程提交multiprocessing.Events也不是一件容易的事情(通过参数这样做会返回RuntimeError,就像在multiprocess.Process中一样)

我想做什么? 我想将搜索空间分区并对每个分区运行任务。但是只需要一个解决方案,该过程会消耗CPU资源。因此,有没有实际舒适的方法可以完成此操作,而不会抵消使用ProcessPool所带来的好处?

示例:

from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait

# function that profits from partitioned search space
def m_run(partition):
    for elem in partition:
        if elem == 135135515:
            return elem
    return False

futures = []
# used to create the partitions
steps = 100000000
with ProcessPoolExecutor(max_workers=4) as pool:
    for i in range(4):
        # run 4 tasks with a partition, but only *one* solution is needed
        partition = range(i*steps,(i+1)*steps)
        futures.append(pool.submit(m_run, partition))

    done, not_done = wait(futures, return_when=FIRST_COMPLETED)
    for d in done:
        print(d.result())

    print("---")
    for d in not_done:
        # will return false for Cancel and Result for all futures
        print("Cancel: "+str(d.cancel()))
        print("Result: "+str(d.result()))

你可以尝试将Event设置为全局变量,而不是将其作为参数传递,参见https://dev59.com/BXI-5IYBdhLWcg3wxruQ。 - niemmi
@niemmi 谢谢您的提示。我可能会尝试使用这个解决方法,但由于调用了不同模块,它感觉设计得不是很好。 - Ketzu
也许这一切都与事实有关,即没有立即取消POSIX API:https://dev59.com/questions/kHI95IYBdhLWcg3w8iv1 - Ciro Santilli OurBigBook.com
4个回答

18

很遗憾,运行Futures无法取消。我相信核心原因是确保在不同实现中具有相同的API(无法中断正在运行的线程或协程)。

Pebble库的设计目的是克服这个问题和其他限制。

from pebble import ProcessPool

def function(foo, bar=0):
    return foo + bar

with ProcessPool() as pool:
    future = pool.schedule(function, args=[1])

    # if running, the container process will be terminated 
    # a new process will be started consuming the next task
    future.cancel()  

4
知道pebble的future继承自concurrent.futures的future很方便。因此,即使pebble没有实现这些方法,许多由concurrent.futures提供的方法也可以应用于pebble的futures。例如,concurrent.futuresas_completed方法也适用于pebble。因此,切换到pebble可能只需要添加一个导入并更改ProcessPoolExecutorpool.submit的名称即可。 - Samufi
3
这可能很明显,但我想指出的是,如果您正在使用ProcessPool,则不再使用多个线程,而是使用多个进程。许多人可能不关心区别,但至少知道自己在做什么是值得的。 - Stephen
OP使用ProcessPoolExecutor,所以使用ProcessPool是公平的。如果您想替换ThreadPoolExecutorpebble也有ThreadPool - fireattack

15

我不知道为什么 concurrent.futures.Future 没有 .kill() 方法,但你可以通过使用 pool.shutdown(wait=False) 关闭进程池,并手动杀死剩余的子进程来实现你想要的效果。

创建一个用于杀死子进程的函数:

import signal, psutil

def kill_child_processes(parent_pid, sig=signal.SIGTERM):
    try:
        parent = psutil.Process(parent_pid)
    except psutil.NoSuchProcess:
        return
    children = parent.children(recursive=True)
    for process in children:
        process.send_signal(sig)

运行代码,直到您获得第一个结果,然后终止所有剩余的子进程:

from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait

# function that profits from partitioned search space
def m_run(partition):
    for elem in partition:
        if elem == 135135515:
            return elem
    return False

futures = []
# used to create the partitions
steps = 100000000
pool = ProcessPoolExecutor(max_workers=4)
for i in range(4):
    # run 4 tasks with a partition, but only *one* solution is needed
    partition = range(i*steps,(i+1)*steps)
    futures.append(pool.submit(m_run, partition))

done, not_done = wait(futures, timeout=3600, return_when=FIRST_COMPLETED)

# Shut down pool
pool.shutdown(wait=False)

# Kill remaining child processes
kill_child_processes(os.getpid())

1

在我的一个软件中,当调用.shutdown()方法时,我需要杀死所有正在运行的执行器进程。由于标准的ProcessPoolExecutor无法实现这一点,我自己动手解决了这个问题。我还修复了未来对象报告的错误运行状态:

import sys
import functools
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures._base import RUNNING


def _callable_wrapper(is_running, fn, *args, **kwargs):
    is_running.value = True
    fn(*args, **kwargs)
    is_running.value = False


def _future_running_override(future, is_running):
    return future._state == RUNNING and is_running.value


class StoppableProcessPoolExecutor(ProcessPoolExecutor):
    """A concurrent.futures.ProcessPoolExecutor that kills running processes on
    shutdown.
    This also fix the wrong running state of futures. See
    https://bugs.python.org/issue37276
    """

    def __init__(self, *args, **kwargs):
        self._state_manager = multiprocessing.Manager()
        ProcessPoolExecutor.__init__(self, *args, **kwargs)

    def shutdown(self, *args, **kwargs):
        processes = self._processes

        # Python < 3.9: We should wait else we got an OSError:
        # https://bugs.python.org/issue36281
        if sys.version_info.major >= 3 and sys.version_info.minor < 9:
            kwargs["wait"] = True

        for pid, process in processes.items():
            process.kill()
        ProcessPoolExecutor.shutdown(self, *args, **kwargs)
        self._state_manager.shutdown()

    shutdown.__doc__ = ProcessPoolExecutor.shutdown.__doc__

    def submit(self, fn, *args, **kwargs):
        is_running = self._state_manager.Value(bool, False)
        future = ProcessPoolExecutor.submit(
            self,
            functools.partial(_callable_wrapper, is_running, fn),
            *args,
            **kwargs,
        )
        # Monkey patch future.running to return the real running state
        future.running = functools.partial(_future_running_override, future, is_running)
        return future

    submit.__doc__ = ProcessPoolExecutor.submit.__doc__

原始来源:https://github.com/flozz/yoga-image-optimizer/blob/master/yoga_image_optimizer/stoppable_process_pool_executor.py

如果能帮到某个人... :)


1
我发现你的问题很有趣,以下是我的研究结果。
我发现.cancel()方法的行为与Python文档中所述的一致。至于同时运行的函数,即使被告知取消,它们也无法被取消。如果我的研究正确,那么我认为Python需要一个更有效的.cancel()方法。
运行下面的代码以检查我的发现。
from concurrent.futures import ProcessPoolExecutor, as_completed
from time import time 

# function that profits from partitioned search space
def m_run(partition):
    for elem in partition:
        if elem == 3351355150:
            return elem
            break #Added to terminate loop once found
    return False

start = time()
futures = []
# used to create the partitions
steps = 1000000000
with ProcessPoolExecutor(max_workers=4) as pool:
    for i in range(4):
        # run 4 tasks with a partition, but only *one* solution is needed
        partition = range(i*steps,(i+1)*steps)
        futures.append(pool.submit(m_run, partition))

    ### New Code: Start ### 
    for f in as_completed(futures):
        print(f.result())
        if f.result():
            print('break')
            break

    for f in futures:
        print(f, 'running?',f.running())
        if f.running():
            f.cancel()
            print('Cancelled? ',f.cancelled())

    print('New Instruction Ended at = ', time()-start )
print('Total Compute Time = ', time()-start )

更新: 可以通过bash强制终止并发进程,但后果是主要的Python程序也将终止。如果这对您不是问题,请尝试以下代码。

您必须在最后两个打印语句之间添加以下代码才能自行查看。注意:此代码仅在您未运行任何其他python3程序时才有效。

import subprocess, os, signal 
result = subprocess.run(['ps', '-C', 'python3', '-o', 'pid='],
                        stdout=subprocess.PIPE).stdout.decode('utf-8').split()
print ('result =', result)
for i in result:
    print('PID = ', i)
    if i != result[0]:
        os.kill(int(i), signal.SIGKILL)
        try: 
           os.kill(int(i), 0)
           raise Exception("""wasn't able to kill the process 
                              HINT:use signal.SIGKILL or signal.SIGABORT""")
        except OSError as ex:
           continue

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接