with open("foo") as f:
f.read()
(但它可能是文件写入、DNS查找或其他许多I/O操作之一。)
如果我在读取时中断此程序(SIGINT),则I/O操作将停止,并抛出KeyboardInterrupt
,并运行终结器。
然而,如果这发生在除主线程之外的线程上,则不会中断I/O操作。
那么……如何在另一个线程上中断I/O操作(类似于在主线程上中断)?
with open("foo") as f:
f.read()
(但它可能是文件写入、DNS查找或其他许多I/O操作之一。)
如果我在读取时中断此程序(SIGINT),则I/O操作将停止,并抛出KeyboardInterrupt
,并运行终结器。
然而,如果这发生在除主线程之外的线程上,则不会中断I/O操作。
那么……如何在另一个线程上中断I/O操作(类似于在主线程上中断)?
键盘中断事件总是在主线程上捕获,它们不会直接影响其他线程(在这种情况下,它们不会因为 Ctrl+C
而被中断)。src1 src2 (在注释中)
这里有一个长时间的 IO 绑定操作示例,我们有时间在它完成之前终止它。KeyboardInterrupt 按预期工作。
import random
import threading
def long_io(file_name):
with open(file_name, "w") as f:
i = 0
while i < 999999999999999999999999999:
f.write(str(random.randint(0, 99999999999999999999999999)))
i += 1
t = threading.Thread(target=long_io, args=("foo",), daemon=True)
t.start()
# keep the main thread alive, listening to possible KeyboardInterupts
while t.is_alive():
t.join(1) # try to join for 1 second, this gives a small window between joins in which the KeyboardInterrupt can rise
daemon
;这样,在出现KeyboardInterrupt时,主线程不会等待IO完成,而是直接终止它。你可以使用非守护线程(推荐)如此处所述,但对于这个例子,直接终止它们就足够了。将子线程设置为守护线程,这意味着其父线程(即主线程)在退出时会终止它(只有非守护线程在其父线程退出时才不会被终止而是等待加入)。我们保持主线程处于活动状态,以便它不会立即完成,而是等待子线程完成。否则,主线程(负责检测键盘中断的线程)将被关闭(如果子线程是守护线程,则会终止它,如果不是守护线程,则等待加入)。我们本可以使用简单的t.join()来实现,但我们没有这样做。为什么?因为KeyboardInterrupt也会受到影响,只有在完成join后才会引发它。线程可以被标记为“守护线程”。这个标志的重要性在于,当只剩下守护线程时,整个Python程序将退出。初始值从创建线程继承。该标志可以通过daemon属性或daemon构造函数参数设置。守护线程在关闭时会突然停止。它们的资源(如打开文件、数据库事务等)可能无法正确释放。如果您希望您的线程正常停止,请使它们成为非守护线程,并使用适当的信号机制,如Event。src
线程内的异常会传播到主线程。以下是一个例子
import threading
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError
def func(raise_exc):
print("Running in {}".format(threading.current_thread().name))
if raise_exc:
time.sleep(1)
raise Exception
time.sleep(3)
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(func, False), executor.submit(func, True)]
while len(futures) > 0:
for fut in futures[:]:
try:
# check if thread has finished its work, with timeout
result = fut.result(timeout=1)
except TimeoutError as exc:
print("Timeout.. retry in thread {}".format(threading.current_thread().name))
except Exception as exc:
print("Exception was thrown in thread {}, exiting".format(threading.current_thread().name))
# we remove this fut from the list, as it's finished
futures.remove(fut)
else:
# op succeeded
print("Thread finished successfully {}".format(threading.current_thread().name))
futures.remove(fut)
print("Bye")
输出结果为
➜ python3 exception-in-thread.py
Running in ThreadPoolExecutor-0_0
Running in ThreadPoolExecutor-0_1
Timeout.. retry in thread MainThread
Exception was thrown in thread MainThread, exiting
Timeout.. retry in thread MainThread
Thread finished successfully MainThread
Bye
但是,正如您所看到的,一个线程中的异常不会影响其他线程。如果这是您想要的,您需要在主线程中捕获信号并将其发送到其他活动线程。
您可以使用全局变量来指示我们是否处于RUNNING状态。然后,当异常传播时,我们捕获它并更新RUNNING状态。要向其他线程发出信号,我们在threadpool对象上调用shutdown。这就是它的样子:
import threading
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError
def func(raise_exc):
print("Running in {}".format(threading.current_thread().name))
if raise_exc:
time.sleep(1)
raise Exception
time.sleep(3)
RUNNING = True
LOCK = threading.Lock()
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(func, False), executor.submit(func, True)]
while RUNNING:
for fut in futures[:]:
if not RUNNING:
break
try:
# check if thread has finished its work, with timeout
result = fut.result(timeout=1)
except TimeoutError as exc:
print("Timeout.. retry in thread {}".format(threading.current_thread().name))
except Exception as exc:
print("Exception was thrown in thread {}, exiting".format(threading.current_thread().name))
# we remove this fut from the list, as it's finished
with LOCK:
print("Stop execution due to exception..")
RUNNING = False
executor.shutdown(wait=False)
else:
# op succeeded
print("Thread finished successfully {}".format(threading.current_thread().name))
futures.remove(fut)
print("Bye")
哪些输出
➜ python3 exception-in-thread.py
Running in ThreadPoolExecutor-0_0
Running in ThreadPoolExecutor-0_1
Timeout.. retry in thread MainThread
Exception was thrown in thread MainThread, exiting
Stop execution due to exception..
Bye
来自https://docs.python.org/3/library/signal.html#signals-and-threads: "Python信号处理程序始终在主解释器的主Python线程中执行,即使信号是在另一个线程中接收到的。这意味着信号不能用作线程间通信的手段。您可以使用线程模块中的同步原语代替。"
在这种情况下,我们可以使用threading
模块中的“事件对象”来同步线程。
import logging
import threading
import time
import signal
def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
logging.debug('wait_for_event starting')
while not e.is_set():
# event_is_set = e.wait(0.3)
pass
if e.is_set():
logging.debug("Keyboard interrupt received from main thread")
return
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
e = threading.Event()
def signal_handler(sig, frame):
global e
logging.debug("Keyboard interrupt receieved by means of Ctrl + C")
e.set()
t1 = threading.Thread(
name='block',
target=wait_for_event,
args=(e,),
)
t1.start()
signal.signal(signal.SIGINT, signal_handler)
print("press Ctrl+C to stop")
signal.pause()
结果:
(block ) wait_for_event starting
press Ctrl+C to stop
^C(MainThread) Keyboard interrupt receieved by means of Ctrl + C
(block ) Keyboard interrupt received from main thread