Python ProcessPoolExecutor - 如何终止任务队列?

3

这是我在stackoverflow上的第一个问题。我大部分能在这里找到我需要了解的内容。顺便说一句,非常感谢。

然而,如果我尝试终止我的ProcessPoolExecutor,它将会一直工作到生成的整个队列完成(..我想是这样的吧?)。有没有简单的方法可以立即清除Processpoolexecutor的队列呢?

from concurrent.futures import ProcessPoolExecutor
from time import sleep
from random import randint


def something_fancy():
    sleep(randint(0, 5))
    return 'im back!'


class Work:
    def __init__(self):
        self.exe = ProcessPoolExecutor(4)

    def start_procs(self):
        for i in range(300):
            t = self.exe.submit(something_fancy)
            t.add_done_callback(self.done)

    def done(self, f):
        print f.result()

    def kill(self):
        self.exe.shutdown()


if __name__ == '__main__':
    work_obj = Work()
    work_obj.start_procs()
    sleep(5)
    work_obj.kill()

我的目标是生成一个队列,由4个进程处理,并且队列中应有300个项目。在5秒后,程序应自动退出。

需要使用进程来处理该任务,因为GIL的存在。

2个回答

1
使用shutdown(wait=False)可以更快地返回。默认值为True。否则,它还提供了一个.Cancel(),如果不可取消,则返回False。

链接到文档

它仍将完成所有处理期货:

如果waitTrue,则此方法将在所有待处理的期货执行完毕并释放与执行程序关联的资源之前不会返回。

如果waitFalse,则此方法将立即返回,并且当所有待处理的期货执行完毕时,与执行程序关联的资源将被释放。无论wait的值如何,整个Python程序都不会退出,直到所有待处理的期货都执行完毕。

如果您有固定的时间,应该提供超时:
map(func, *iterables, timeout=None, chunksize=1)

这个可以是浮点数或整数,以秒为单位指定


1
文档现在明确指定超时时间以秒为单位设置(https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.map) - Théo Rubenach

0

谢谢Patrick

有了提示,我能够通过将Futures添加到列表中并手动调整队列大小来取消每个进程。如果没有这个提示,仍然会启动太多的进程。

似乎没有API可以调整队列大小、暂停执行或删除进程队列。

然而,实现这一点的唯一方法是在线程中运行主对象,以便主脚本可以随时终止它。我仍在尝试捕获“CancelledError”。

对我来说看起来很“不干净”,也不符合Pythonic的风格。我会接受任何其他建议。非常感谢。

from concurrent.futures import ProcessPoolExecutor, CancelledError
from time import sleep
from random import randint
from threading import Thread


def something_fancy():
    sleep(randint(0, 5))
    return 'im back!'


class Work:
    def __init__(self):
        self.exe = ProcessPoolExecutor(4)
        self.futures = []
        self.max_queue = 50
        self.killed = False

    def start_procs(self):
        for i in range(200000):
            while not self.killed:
                if len(self.futures) <= self.max_queue:
                    t = self.exe.submit(something_fancy)
                    t.add_done_callback(self.done)
                    self.futures.append(t)
                    break

    def done(self, f):
        print f.result()
        self.futures.remove(f)

    def kill(self):
        self.killed = True
        for future in self.futures:
            try:
                future.cancel()
            except CancelledError, e:
                print e


if __name__ == '__main__':
    work_obj = Work()
    Thread(target=work_obj.start_procs).start()
    sleep(5)
    work_obj.kill()

编辑

from concurrent.futures import ProcessPoolExecutor, CancelledError
from time import sleep
from random import randint
from threading import Thread


def something_fancy():
    sleep(0.5)
    return 'Hello World, Future was running!'


class Work:
    def __init__(self):
        cpu_usage = 4
        self.exe = ProcessPoolExecutor(cpu_usage)
        self.futures = []
        self.max_queue = cpu_usage*3
        self.stop = False
        self.paused = False

    def start_procs(self):
        for i in range(200000):
            while not self.stop:
                if len(self.futures) <= self.max_queue:
                    if not self.paused:
                        t = self.exe.submit(something_fancy)
                        t.add_done_callback(self._done)
                        self.futures.append(t)
                        break

    def _done(self, f):
        print f.result()
        self.futures.remove(f)

    def pause(self):
        self.paused = False if self.paused else True

    def shutdown(self):
        self.stop = True
        for future in self.futures:
            try:
                future.cancel()
            except CancelledError, e:
                print e


if __name__ == '__main__':
    work_obj = Work()
    Thread(target=work_obj.start_procs).start()
    print 'Started'
    sleep(5)
    work_obj.pause()
    print 'Paused'
    sleep(5)
    work_obj.pause()
    print 'Continue'
    sleep(5)
    work_obj.shutdown()
    print 'Shutdown'

这个可以用 - 但是仍然无法捕获CancelledError,而且还相当不干净。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接