如何在Python多线程中中断I/O操作?

10
例如,
with open("foo") as f:
  f.read()

(但它可能是文件写入、DNS查找或其他许多I/O操作之一。)

如果我在读取时中断此程序(SIGINT),则I/O操作将停止,并抛出KeyboardInterrupt,并运行终结器。

然而,如果这发生在除主线程之外的线程上,则不会中断I/O操作。

那么……如何在另一个线程上中断I/O操作(类似于在主线程上中断)?


1
根据文档,“Python信号处理程序始终在主解释器的主Python线程中执行,即使信号是在另一个线程中接收到的”,我认为这不是你可以做的事情。- https://docs.python.org/3/library/signal.html#signals-and-threads - Macattack
@Macattack,所以如果你在非主线程上执行I/O操作,你只需要提交(commit)就可以了,是吗? - Paul Draper
3个回答

2

键盘中断事件总是在主线程上捕获,它们不会直接影响其他线程(在这种情况下,它们不会因为 Ctrl+C 而被中断)。src1 src2 (在注释中)

这里有一个长时间的 IO 绑定操作示例,我们有时间在它完成之前终止它。KeyboardInterrupt 按预期工作。

import random
import threading


def long_io(file_name):
    with open(file_name, "w") as f:
        i = 0
        while i < 999999999999999999999999999:
            f.write(str(random.randint(0, 99999999999999999999999999)))
            i += 1


t = threading.Thread(target=long_io, args=("foo",), daemon=True)
t.start()

# keep the main thread alive, listening to possible KeyboardInterupts
while t.is_alive():
    t.join(1)  # try to join for 1 second, this gives a small window between joins in which the KeyboardInterrupt can rise

注意:
  • 线程被标记为daemon;这样,在出现KeyboardInterrupt时,主线程不会等待IO完成,而是直接终止它。你可以使用非守护线程(推荐)如此处所述,但对于这个例子,直接终止它们就足够了。

线程可以被标记为“守护线程”。这个标志的重要性在于,当只剩下守护线程时,整个Python程序将退出。初始值从创建线程继承。该标志可以通过daemon属性或daemon构造函数参数设置。守护线程在关闭时会突然停止。它们的资源(如打开文件、数据库事务等)可能无法正确释放。如果您希望您的线程正常停止,请使它们成为非守护线程,并使用适当的信号机制,如Event。src

将子线程设置为守护线程,这意味着其父线程(即主线程)在退出时会终止它(只有非守护线程在其父线程退出时才不会被终止而是等待加入)。我们保持主线程处于活动状态,以便它不会立即完成,而是等待子线程完成。否则,主线程(负责检测键盘中断的线程)将被关闭(如果子线程是守护线程,则会终止它,如果不是守护线程,则等待加入)。我们本可以使用简单的t.join()来实现,但我们没有这样做。为什么?因为KeyboardInterrupt也会受到影响,只有在完成join后才会引发它。
主线程在thread.join()这一行上仍然被阻塞。

0

线程内的异常会传播到主线程。以下是一个例子

import threading
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError


def func(raise_exc):
    print("Running in {}".format(threading.current_thread().name))
    if raise_exc:
        time.sleep(1)
        raise Exception

    time.sleep(3)


with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(func, False), executor.submit(func, True)]

    while len(futures) > 0:
        for fut in futures[:]:
            try:
                # check if thread has finished its work, with timeout
                result = fut.result(timeout=1)

            except TimeoutError as exc:
                print("Timeout.. retry in thread {}".format(threading.current_thread().name))
            except Exception as exc:
                print("Exception was thrown in thread {}, exiting".format(threading.current_thread().name))
                # we remove this fut from the list, as it's finished
                futures.remove(fut)

            else:
                # op succeeded
                print("Thread finished successfully {}".format(threading.current_thread().name))
                futures.remove(fut)


print("Bye")

输出结果为

➜ python3 exception-in-thread.py
Running in ThreadPoolExecutor-0_0
Running in ThreadPoolExecutor-0_1
Timeout.. retry in thread MainThread
Exception was thrown in thread MainThread, exiting
Timeout.. retry in thread MainThread
Thread finished successfully MainThread
Bye

但是,正如您所看到的,一个线程中的异常不会影响其他线程。如果这是您想要的,您需要在主线程中捕获信号并将其发送到其他活动线程。

您可以使用全局变量来指示我们是否处于RUNNING状态。然后,当异常传播时,我们捕获它并更新RUNNING状态。要向其他线程发出信号,我们在threadpool对象上调用shutdown。这就是它的样子:

import threading
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError


def func(raise_exc):
    print("Running in {}".format(threading.current_thread().name))
    if raise_exc:
        time.sleep(1)
        raise Exception

    time.sleep(3)


RUNNING = True
LOCK = threading.Lock()

with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(func, False), executor.submit(func, True)]

    while RUNNING:
        for fut in futures[:]:

            if not RUNNING:
                break

            try:
                # check if thread has finished its work, with timeout
                result = fut.result(timeout=1)

            except TimeoutError as exc:
                print("Timeout.. retry in thread {}".format(threading.current_thread().name))
            except Exception as exc:
                print("Exception was thrown in thread {}, exiting".format(threading.current_thread().name))
                # we remove this fut from the list, as it's finished
                with LOCK:
                    print("Stop execution due to exception..")
                    RUNNING = False
                    executor.shutdown(wait=False)

            else:
                # op succeeded
                print("Thread finished successfully {}".format(threading.current_thread().name))
                futures.remove(fut)


print("Bye")

哪些输出

➜ python3 exception-in-thread.py
Running in ThreadPoolExecutor-0_0
Running in ThreadPoolExecutor-0_1
Timeout.. retry in thread MainThread
Exception was thrown in thread MainThread, exiting
Stop execution due to exception..
Bye

请注意,我们使用锁来保护全局变量,因为可能会有多个线程同时访问它。

0

来自https://docs.python.org/3/library/signal.html#signals-and-threads: "Python信号处理程序始终在主解释器的主Python线程中执行,即使信号是在另一个线程中接收到的。这意味着信号不能用作线程间通信的手段。您可以使用线程模块中的同步原语代替。"

在这种情况下,我们可以使用threading模块中的“事件对象”来同步线程。

import logging
import threading
import time
import signal


def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    logging.debug('wait_for_event starting')
    while not e.is_set():
        # event_is_set = e.wait(0.3)
        pass
    
    if e.is_set():
        logging.debug("Keyboard interrupt received from main thread")
        return
    

logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

e = threading.Event()

def signal_handler(sig, frame):
    global e
    logging.debug("Keyboard interrupt receieved by means of Ctrl + C")
    e.set()


t1 = threading.Thread(
    name='block',
    target=wait_for_event,
    args=(e,),
)
t1.start()

signal.signal(signal.SIGINT, signal_handler)
print("press Ctrl+C to stop")
signal.pause()

结果:

(block     ) wait_for_event starting
press Ctrl+C to stop
^C(MainThread) Keyboard interrupt receieved by means of Ctrl + C
(block     ) Keyboard interrupt received from main thread

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接