在Python中重启一个线程

20
我正在尝试使用Python 3.4为一个项目制作多线程飞行软件,在此过程中,如果传感器读取或其他异常崩溃发生I/O错误,需要让线程能够重新启动自身。因此,我正在研究如何制作一个监视程序用于检查线程是否已经停止并重启它们。

起初,我尝试只是检查线程是否还活着并重启它,这样做:

>>> if not a_thread.isAlive():
...     a_thread.start()
Traceback (most recent call last):
  File "<stdin>", line 2, in <module>
  File "c:\Python34\lib\threading.py", line 847, in start
    raise RuntimeError("threads can only be started once")
RuntimeError: threads can only be started once

这种行为从线程和Python本身的角度来看是有意义的,但使我的工作更加困难。因此我使用字典实现了一种解决方案,将初始线程存储到一个新对象中并在必要时启动它。不幸的是,这也行不通。 以下是一个基本示例:
import threading
import logging
import queue
import time
from copy import copy, deepcopy

def a():
    print("I'm thread a")
def b():
    print("I'm thread b")

# Create thread objects
thread_dict = {
'a': threading.Thread(target=a, name='a'),
'b': threading.Thread(target=b, name='b')
}

threads = [copy(t) for t in thread_dict.values()]

for t in threads:
    t.start()
for i in range(len(threads)):
    if not threads[i].isAlive():
        temp = thread_dict[threads[i].name]
        threads[i] = deepcopy(temp)
        threads[i].start()
    thread(i).join(5)

返回:

I'm thread a
I'm thread b
Traceback (most recent call last):
  File "main_test.py", line 25, in <module>
    threads[i] = deepcopy(temp)
  File "c:\Python34\lib\copy.py", line 182, in deepcopy
    y = _reconstruct(x, rv, 1, memo)
  ... (there's about 20 lines of traceback within copy)
  File "c:\Python34\lib\copyreg.py", line 88, in __newobj__
    return cls.__new__(cls, *args)
TypeError: object.__new__(_thread.lock) is not safe, use _thread.lock.__new__()

显然,线程对象不可安全复制... 有没有重新启动线程的方法,而无需重新创建整个对象?


3
你能否只处理线程内的崩溃,以便不需要重新启动? - 101
正如“figs”所说,你应该在线程内部处理异常,而不是尝试重新启动它。请参阅此处的异常文档:https://docs.python.org/2/tutorial/errors.html 这样做更加简单和符合Python风格。 - CoMartel
不要重启它,只需在其上编写一个包装器。 - 黃瑞昌
4个回答

45

没有理由让你的线程死亡。

如果它们实际上崩溃了,整个程序将会崩溃。

如果它们只是引发异常,您可以捕获这些异常。

如果它们正常返回,您可以选择不这样做。

您甚至可以轻松地包装一个线程函数,让它在异常或返回时重新启动:

def threadwrap(threadfunc):
    def wrapper():
        while True:
            try:
                threadfunc()
            except BaseException as e:
                print('{!r}; restarting thread'.format(e))
            else:
                print('exited normally, bad thread; restarting')
    return wrapper

thread_dict = {
    'a': threading.Thread(target=wrapper(a), name='a'),
    'b': threading.Thread(target=wrapper(b), name='b')
}    

问题已解决。


你不能重新启动一个线程。

大多数平台都没有这样的功能。

在概念上,这也没有任何意义。当线程完成时,它的堆栈已经死亡;它的父线程被标记或者通知;一旦它加入到其他线程中,它的资源会被销毁(包括操作系统级别的资源比如进程表项)。唯一的重启它的方式就是创建一个全新的一组东西,而你已经可以通过创建新的线程来做到这一点。

所以,干吧。如果你真的不想在内部处理异常,那就存储构造参数并使用它们来启动一个新线程。

你甚至可以创建自己的子类来为你保留这些参数:

class RestartableThread(threading.Thread):
    def __init__(self, *args, **kwargs):
        self._args, self._kwargs = args, kwargs
        super().__init__(*args, **kwargs)
    def clone(self):
        return RestartableThread(*self._args, **self._kwargs)

现在可以轻松“复制”线程(包括您想要的语义):

if not a_thread.is_alive():
    a_thread = a_thread.clone()

是的,threading.Thread对象不可安全复制

你期望会发生什么呢?最好情况下,你会得到一个不同的包装器,包装了相同的操作系统级线程对象,所以你可以欺骗Python,使其没有注意到你试图做违法的、可能导致段错误的事情。


4
你是不是在调用 threading.Thread 时想使用 target=threadwrap(a) 参数? - Sandy Gifford

2

如果你想检查线程是否被终止,然后重新启动同一线程,则可以首先将其设置为 None。

if not a_thread.isAlive():
    a_thread = None
    a_thread .Thread(target=threadFunc)
    a_thread.start()

你是不是指的是 a_thread = Thread(target=threadFunc) - undefined

0
#although not exactly answers question, i wrote it so i could,
#run any function as a thread, stop it start again etc.
from threading import Thread
import time
from functools import *


class ThreadAlreadyActive(Exception):
    pass


class Threads(Thread):
    def __init__(self, *args, **kwargs):
        self.run_count = 0
        self.stop_ = False
        Thread.__init__(self, *args, **kwargs)

    def __call__(self, *args, **kwargs):
        if self.is_alive():
            raise ThreadAlreadyActive()
        else:
            if self.run_count == 0:
                self.start()
            else:
                Thread.__init__(self, target=self.fn, args=self.args)
                self.start()

    def start(self):
        if not (self.is_alive()):
            self.run_count += 1
            self.stop_ = False
            Thread.start(self)
        else:
            print("already running")

    def stop(self):
        self.stop_ = True
        if self.is_alive():
            time.sleep(10)
            self.stop()
        else:
            pass

    @classmethod
    def register_function(cls, args, fn):
        obj = cls(target=fn, args=args)
        obj.fn = fn
        obj.args = args
        obj.stop_ = False
        return obj

    @classmethod
    def register_thread(cls, args=None):
        return partial(cls.register_function, args)


@Threads.register_thread(args=())
def hello():
    while True:
        time.sleep(5)
        print("im alive")
        if hello.stop_:
            break
        else:
            continue


if __name__ == "__main__":
    hello()
    time.sleep(25)
    print("calling again")
    hello.start()
    time.sleep(20)
    print("issuing stop")
    hello.stop()
    time.sleep(5)
    print("calling again")
    hello()
    time.sleep(10)
    print(hello.is_alive())

0

这里是一个完全重启线程的示例。 也许不是最好的解决方案,但对于我的目的来说非常有效。

#!/usr/bin/python3

import threading
from time import sleep

def thread1(thread_id, number):
    thr = threading.currentThread()
    print("[thread id:%d] start function" % (thread_id))
    while True:
       if getattr(thr, "need_stop", False):
          print("[thread id:%d] Thread was stopped by external signal number"%(thread_id))
          exit(0)
       print("[thread id:%d] Number: %d<- "%(thread_id, number))
       sleep(1)

def main():
    thread_1 = []
    thread_2 = []
    for i in range(10):
       sleep(0.5)
       if i in [0,1,4,6]:
          if len(thread_2) != 0:
              for thr2 in thread_2:
                  thr2.need_stop = True
                  thr2.join()
                  thread_2.remove(thr2)

          if 'thr_1' not in locals():
              thr_1 = threading.Thread(target=thread1, args=([1, i]))
          if thr_1.is_alive() is False:
              try:
                thr_1.start()
                thread_1.append(thr_1)
              except Exception as e:
                del thr_1
                thr_1 = threading.Thread(target=thread1, args=([1, i]))
                thr_1.start()
                thread_1.append(thr_1)
       else:
          if len(thread_1) != 0:
              for thr1 in thread_1:
                  thr1.need_stop = True
                  thr1.join()
                  thread_1.remove(thr1)

          if 'thr_2' not in locals():
              thr_2 = threading.Thread(target=thread1, args=([2, i]))
          if thr_2.is_alive() is False:
              try:
                thr_2.start()
                thread_2.append(thr_2)
              except Exception as e:
                del thr_2
                thr_2 = threading.Thread(target=thread1, args=([2, i]))
                thr_2.start()
                thread_2.append(thr_2)

    # finish all threads
    if len(thread_2) != 0:
        for thr2 in thread_2:
            thr2.need_stop = True
            thr2.join()
            thread_2.remove(thr2)
    if len(thread_1) != 0:
        for thr1 in thread_1:
            thr1.need_stop = True
            thr1.join()
            thread_1.remove(thr1)
  

if __name__ == '__main__':
   main()

代码的要点是,如果线程已经在运行,则不要触碰它。从输出中可以看到,没有数字1、3,因为线程已经在运行。 输出:
$ python3 test_thread.py
[thread id:1] start function
[thread id:1] Number: 0<-
[thread id:1] Number: 0<-
[thread id:1] Thread was stopped by external signal number
[thread id:2] start function
[thread id:2] Number: 2<-
[thread id:2] Number: 2<-
[thread id:2] Thread was stopped by external signal number
[thread id:1] start function
[thread id:1] Number: 4<-
[thread id:1] Thread was stopped by external signal number
[thread id:2] start function
[thread id:2] Number: 5<-
[thread id:2] Thread was stopped by external signal number
[thread id:1] start function
[thread id:1] Number: 6<-
[thread id:1] Thread was stopped by external signal number
[thread id:2] start function
[thread id:2] Number: 7<-
[thread id:2] Number: 7<-
[thread id:2] Thread was stopped by external signal number

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接