有没有办法终止一个线程?

987

是否可能在不设置/检查任何标志/信号量等的情况下终止正在运行的线程?


特别是,在线程中是否有一种方法可以生成类似于KeyboardInterrupt的异常? - Josiah Yoder
31个回答

2

Python版本:3.8

使用守护线程来执行我们想要的操作,如果我们想要终止守护线程,只需让父线程退出,系统就会终止由父线程创建的守护线程。

同时支持协程和协程函数。

def main():
    start_time = time.perf_counter()
    t1 = ExitThread(time.sleep, (10,), debug=False)
    t1.start()
    time.sleep(0.5)
    t1.exit()
    try:
        print(t1.result_future.result())
    except concurrent.futures.CancelledError:
        pass
    end_time = time.perf_counter()
    print(f"time cost {end_time - start_time:0.2f}")

以下是ExitThread源代码
import concurrent.futures
import threading
import typing
import asyncio


class _WorkItem(object):
    """ concurrent\futures\thread.py

    """

    def __init__(self, future, fn, args, kwargs, *, debug=None):
        self._debug = debug
        self.future = future
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

    def run(self):
        if self._debug:
            print("ExitThread._WorkItem run")
        if not self.future.set_running_or_notify_cancel():
            return

        try:
            coroutine = None
            if asyncio.iscoroutinefunction(self.fn):
                coroutine = self.fn(*self.args, **self.kwargs)
            elif asyncio.iscoroutine(self.fn):
                coroutine = self.fn
            if coroutine is None:
                result = self.fn(*self.args, **self.kwargs)
            else:
                result = asyncio.run(coroutine)
            if self._debug:
                print("_WorkItem done")
        except BaseException as exc:
            self.future.set_exception(exc)
            # Break a reference cycle with the exception 'exc'
            self = None
        else:
            self.future.set_result(result)


class ExitThread:
    """ Like a stoppable thread

    Using coroutine for target then exit before running may cause RuntimeWarning.

    """

    def __init__(self, target: typing.Union[typing.Coroutine, typing.Callable] = None
                 , args=(), kwargs={}, *, daemon=None, debug=None):
        #
        self._debug = debug
        self._parent_thread = threading.Thread(target=self._parent_thread_run, name="ExitThread_parent_thread"
                                               , daemon=daemon)
        self._child_daemon_thread = None
        self.result_future = concurrent.futures.Future()
        self._workItem = _WorkItem(self.result_future, target, args, kwargs, debug=debug)
        self._parent_thread_exit_lock = threading.Lock()
        self._parent_thread_exit_lock.acquire()
        self._parent_thread_exit_lock_released = False  # When done it will be True
        self._started = False
        self._exited = False
        self.result_future.add_done_callback(self._release_parent_thread_exit_lock)

    def _parent_thread_run(self):
        self._child_daemon_thread = threading.Thread(target=self._child_daemon_thread_run
                                                     , name="ExitThread_child_daemon_thread"
                                                     , daemon=True)
        self._child_daemon_thread.start()
        # Block manager thread
        self._parent_thread_exit_lock.acquire()
        self._parent_thread_exit_lock.release()
        if self._debug:
            print("ExitThread._parent_thread_run exit")

    def _release_parent_thread_exit_lock(self, _future):
        if self._debug:
            print(f"ExitThread._release_parent_thread_exit_lock {self._parent_thread_exit_lock_released} {_future}")
        if not self._parent_thread_exit_lock_released:
            self._parent_thread_exit_lock_released = True
            self._parent_thread_exit_lock.release()

    def _child_daemon_thread_run(self):
        self._workItem.run()

    def start(self):
        if self._debug:
            print(f"ExitThread.start {self._started}")
        if not self._started:
            self._started = True
            self._parent_thread.start()

    def exit(self):
        if self._debug:
            print(f"ExitThread.exit exited: {self._exited} lock_released: {self._parent_thread_exit_lock_released}")
        if self._parent_thread_exit_lock_released:
            return
        if not self._exited:
            self._exited = True
            if not self.result_future.cancel():
                if self.result_future.running():
                    self.result_future.set_exception(concurrent.futures.CancelledError())

2
假设您想要多个相同功能的线程,这是我认为最简单的实现方法之一,可以通过id停止其中一个:
import time
from threading import Thread

def doit(id=0):
    doit.stop=0
    print("start id:%d"%id)
    while 1:
        time.sleep(1)
        print(".")
        if doit.stop==id:
            doit.stop=0
            break
    print("end thread %d"%id)

t5=Thread(target=doit, args=(5,))
t6=Thread(target=doit, args=(6,))

t5.start() ; t6.start()
time.sleep(2)
doit.stop =5  #kill t5
time.sleep(2)
doit.stop =6  #kill t6

这里的好处在于,您可以拥有多个相同和不同的函数,通过functionname.stop停止它们所有。

如果您只想运行一个函数线程,则无需记住其id。只需在doit.stop>0时停止即可。


纯函数式线程,没有任何类。 - F.Tamy

1
一种替代方法是使用 signal.pthread_kill 发送停止信号。
from signal import pthread_kill, SIGTSTP
from threading import Thread
from itertools import count
from time import sleep

def target():
    for num in count():
        print(num)
        sleep(1)

thread = Thread(target=target)
thread.start()
sleep(5)
pthread_kill(thread.ident, SIGTSTP)

结果

0
1
2
3
4

[14]+  Stopped

5
可用性:Unix - imba-tjd

1

如@Kozyarchuk的答案中所述,安装跟踪器是有效的。由于该答案未包含代码,因此这里提供一个可用的实例:

import sys, threading, time 

class TraceThread(threading.Thread): 
    def __init__(self, *args, **keywords): 
        threading.Thread.__init__(self, *args, **keywords) 
        self.killed = False
    def start(self): 
        self._run = self.run 
        self.run = self.settrace_and_run
        threading.Thread.start(self) 
    def settrace_and_run(self): 
        sys.settrace(self.globaltrace) 
        self._run()
    def globaltrace(self, frame, event, arg): 
        return self.localtrace if event == 'call' else None
    def localtrace(self, frame, event, arg): 
        if self.killed and event == 'line': 
            raise SystemExit() 
        return self.localtrace 

def f(): 
    while True: 
        print('1') 
        time.sleep(2)
        print('2') 
        time.sleep(2)
        print('3') 
        time.sleep(2)

t = TraceThread(target=f) 
t.start() 
time.sleep(2.5) 
t.killed = True

它在打印了12后停止。 3没有被打印出来。

0

如果您真的需要能够终止子任务的能力,则请使用替代实现。 multiprocessinggevent 都支持无差别地终止“线程”。

Python 的线程不支持取消。甚至不要尝试。您的代码很可能会死锁、损坏或泄漏内存,或者产生其他意外且难以调试的“有趣”效果,这些效果很少并且是非确定性的。


2
是的,我知道这两种方法都不严格符合“线程”的定义,但如果你的代码适合它们的模型(或者可以调整使之适合),它们都能够发挥作用。 - Matthias Urlichs

0
Pieter Hintjens是ØMQ项目的创始人之一,他说,使用ØMQ并避免使用锁、互斥量、事件等同步原语,是编写多线程程序最明智和最安全的方式。

http://zguide.zeromq.org/py:all#Multithreading-with-ZeroMQ

这包括告诉子线程,它应该取消它的工作。这可以通过为线程配备一个 ØMQ-socket 并在该 socket 上轮询以获取一条消息来完成,该消息表示应该取消。

该链接还提供了一个使用 ØMQ 的多线程 Python 代码示例。


-1

你可以在进程中执行命令,然后使用进程 ID 来终止它。 我需要在两个线程之间进行同步,其中一个线程本身不会返回。

processIds = []

def executeRecord(command):
    print(command)

    process = subprocess.Popen(command, stdout=subprocess.PIPE)
    processIds.append(process.pid)
    print(processIds[0])

    #Command that doesn't return by itself
    process.stdout.read().decode("utf-8")
    return;


def recordThread(command, timeOut):

    thread = Thread(target=executeRecord, args=(command,))
    thread.start()
    thread.join(timeOut)

    os.kill(processIds.pop(), signal.SIGINT)

    return;

-1

这似乎可以在Windows 7上使用pywin32工作

my_thread = threading.Thread()
my_thread.start()
my_thread._Thread__stop()

-2

这是一个糟糕的答案,请查看评论

以下是如何操作:

from threading import *

...

for thread in enumerate():
    if thread.isAlive():
        try:
            thread._Thread__stop()
        except:
            print(str(thread.getName()) + ' could not be terminated'))

等待几秒钟,然后您的线程应该停止。还要检查thread._Thread__delete()方法。

我建议使用thread.quit()方法以方便操作。例如,如果您的线程中有一个套接字,我建议在您的套接字处理类中创建一个quit()方法,终止套接字,然后在quit()内运行thread._Thread__stop()


12
"更多关于“这并不能真正停止一个线程”的细节会很有帮助。" - 2371
19
基本上,调用_Thread__stop方法除了告诉Python线程已停止之外,没有其他影响。实际上,它可能会继续运行。请参见https://gist.github.com/2787191以查看示例。 - Bluehorn
35
这是明显的错误。_Thread__stop() 仅仅是将一个线程标记为已停止,它并不能真正停止线程!绝不要这么做。阅读此文 - dotancohen

-2

使用setDaemon(True)启动子线程。

def bootstrap(_filename):
    mb = ModelBootstrap(filename=_filename) # Has many Daemon threads. All get stopped automatically when main thread is stopped.

t = threading.Thread(target=bootstrap,args=('models.conf',))
t.setDaemon(False)

while True:
    t.start()
    time.sleep(10) # I am just allowing the sub-thread to run for 10 sec. You can listen on an event to stop execution.
    print('Thread stopped')
    break

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接