一旦期货交易开始,如何终止它们?

66

我正在使用新的 concurrent.futures 模块(也有 Python 2 的移植版)来进行一些简单的多线程 I/O。我在理解如何干净地终止使用该模块启动的任务方面遇到了问题。

请查看以下 Python 2/3 脚本,它重现了我所见到的行为:

#!/usr/bin/env python
from __future__ import print_function

import concurrent.futures
import time


def control_c_this():
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        future1 = executor.submit(wait_a_bit, name="Jack")
        future2 = executor.submit(wait_a_bit, name="Jill")

        for future in concurrent.futures.as_completed([future1, future2]):
            future.result()

        print("All done!")


def wait_a_bit(name):
    print("{n} is waiting...".format(n=name))
    time.sleep(100)


if __name__ == "__main__":
    control_c_this()

在运行此脚本时,使用常规的Control-C键盘中断似乎无法干净地终止。我正在OS X上运行。

  • 对于Python 2.7,我必须从命令行中使用kill来杀死脚本。 Control-C被忽略。
  • 在Python 3.4上,如果你按两次Control-C,它就会起作用,但是然后会输出很多奇怪的堆栈跟踪。

我在网上找到的大部分文档都是关于如何使用旧的threading模块清理其中的线程。但似乎这些文档都不适用于此处。

concurrent.futures模块提供的所有方法(例如Executor.shutdown()Future.cancel())只有在Futures尚未启动或已完成时才能起作用,而在这种情况下是没有意义的。 我想立即中断Future。

我的用例很简单:当用户按Control-C时,脚本应该像任何表现良好的脚本一样立即退出。 这就是我想要的全部内容。

那么,使用concurrent.futures时,正确的获取此行为的方法是什么?


阅读有关Java的相关问题后,我发现通常不会杀死线程,因为这可能会使程序状态不一致。在我的情况下,我认为这不是一个问题,因为我只想让整个程序退出。还提到了设置一些共享变量,线程可以读取以知道何时自行终止。不确定这种方法是否适用于Python。 - Nick Chammas
提醒一下,即使 Ctrl+C 无效,Ctrl+Break 也可以使用。 - jedwards
1
@jedwards - 我正在使用Python 2,尝试使用Command + .(在OS X上显然是Control + Break),但似乎不起作用。实际上似乎相当于Control + C。 - Nick Chammas
4个回答

42
这有点痛苦。基本上,你的工作线程必须在主线程退出之前完成。除非它们完成,否则你无法退出。典型的解决方法是拥有一些全局状态,每个线程可以检查以确定是否应该继续工作。
这里是引用解释为什么要这样做。实质上,如果线程在解释器退出时退出,可能会发生糟糕的事情。
这是一个可行的示例。请注意,由于子线程的睡眠时间,Ctrl+C最多需要1秒才能传播。
#!/usr/bin/env python
from __future__ import print_function

import concurrent.futures
import time
import sys

quit = False
def wait_a_bit(name):
    while not quit:
        print("{n} is doing work...".format(n=name))
        time.sleep(1)

def setup():
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
    future1 = executor.submit(wait_a_bit, "Jack")
    future2 = executor.submit(wait_a_bit, "Jill")

    # main thread must be doing "work" to be able to catch a Ctrl+C 
    # http://www.luke.maurits.id.au/blog/post/threads-and-signals-in-python.html
    while (not (future1.done() and future2.done())):
        time.sleep(1)

if __name__ == "__main__":
    try:
        setup()
    except KeyboardInterrupt:
        quit = True

1
你不需要睡觉,你只需要让它们检查是否应该退出。 - cdosborn
1
这个技巧在我使用ThreadPoolExecutor时有效,但在使用ProcessPoolExecutor时无效。在尝试跨进程共享全局变量时是否有什么需要注意的地方?我是否需要将“退出”标志存储在磁盘上或其他地方? - Gustavo Bezerra
9
进程不共享变量,需要使用队列或信号量进行通信。 - mdurant
2
据我所知,睡眠通常不会以任何相关数量消耗CPU时间。尝试创建10k个线程,它们立即进入一天的睡眠状态;设置完成后,您将看不到任何CPU使用情况。因此,在大多数应用程序中,这应该是可以接受的。 - Felix D.
3
这个答案似乎有两个方面是缺失的。第一,为什么反复按CTRL-C确实会更快地关闭所有东西。第二,您引用的语录在考虑信号时具有某种不太现实的期望:“工人在评估工作项时可能会被杀死,如果正在评估的可调用对象具有外部副作用,例如写入文件,则可能会很糟糕。” 如果程序是单线程的,CTRL-C通常会对整个执行产生这些影响。那么只需从主线程向所有守护进程传播SIGINT,然后join()它们呢? - init_js
显示剩余2条评论

14
我遇到了这个问题,但我遇到的问题是许多未来(成千上万)正在等待运行,只是按下Ctrl+C并没有真正退出。我正在使用concurrent.futures.wait来运行一个进度循环,并且需要添加一个try ... except KeyboardInterrupt来处理取消未完成的未来任务。
POLL_INTERVAL = 5
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
    futures = [pool.submit(do_work, arg) for arg in large_set_to_do_work_over]
    # next line returns instantly
    done, not_done = concurrent.futures.wait(futures, timeout=0)
    try:
        while not_done:
            # next line 'sleeps' this main thread, letting the thread pool run
            freshly_done, not_done = concurrent.futures.wait(not_done, timeout=POLL_INTERVAL)
            done |= freshly_done
            # more polling stats calculated here and printed every POLL_INTERVAL seconds...
    except KeyboardInterrupt:
        # only futures that are not done will prevent exiting
        for future in not_done:
            # cancel() returns False if it's already done or currently running,
            # and True if was able to cancel it; we don't need that return value
            _ = future.cancel()
        # wait for running futures that the above for loop couldn't cancel (note timeout)
        _ = concurrent.futures.wait(not_done, timeout=None)

如果您对准确追踪完成的工作和未完成的工作不感兴趣(即不需要进度循环),您可以将第一个等待调用(具有 timeout=0 的调用)替换为 not_done = futures,并仍然保持 while not_done: 的逻辑。

使用返回值,可能可以使 for future in not_done: 的取消循环行为不同(或者编写为推导式),但是等待已完成或已取消的任务实际上并不等待 - 它会立即返回。最后一个带有 timeout=Nonewait 确保池中正在运行的作业真正完成。

再次强调,这仅在被调用的 do_work 最终能够在合理的时间内返回时才能正确运行。对我来说没问题 - 实际上,我希望确保一旦开始执行 do_work,它就会完整地运行。如果 do_work 是“无限”的,那么您将需要像 cdosborn's answer 那样使用一个对所有线程可见的变量,向它们发出停止信号。


请查看此链接:https://dev59.com/-XUOtIcB2Jgan1zn3f1x#72942970 - Erik Aronesty
1
这正是我正在寻找的!我创建了一个最小工作示例。分享出来,以防对任何人有所帮助。 - rusheb

3
晚到派对,但我刚遇到了同样的问题。
我想立即终止我的程序,不管发生了什么。我不需要比Linux所做的干净关闭更多。
我发现,在KeyboardInterrupt异常处理程序中用os.kill(os.getpid(), 9)替换geitda的代码后,第一次按下Ctrl+C后立即退出。

请不要将"谢谢"作为答案。一旦你有足够的声望,你就可以投票支持有用的问题和答案了。- 来自审核 - user11717481

0
main = str(os.getpid())
def ossystem(c):
    return subprocess.Popen(c, shell=True, stdout=subprocess.PIPE).stdout.read().decode("utf-8").strip()
def killexecutor():
    print("Killing")
    pids = ossystem('ps -a | grep scriptname.py').split('\n')
    for pid in pids:
        pid = pid.split(' ')[0].strip()
        if(str(pid) != main):
            os.kill(int(pid), 9)


...
    killexecutor()

请在您的代码中添加一些描述。 - Julia Meshcheryakova
这真是个糟糕的主意 - E.Serra

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接