如何让Python线程优雅地结束

10
我正在进行一个涉及数据收集和日志记录的项目。我有两个线程在运行,一个是收集线程,另一个是日志线程,两者都在主线程中启动。我试图允许程序在按下Ctrl-C时优雅地终止。
我使用了threading.Event来向线程发出结束它们各自循环的信号。它可以正常停止sim_collectData方法,但似乎不能正确地停止logData线程。"Collection terminated"打印语句从未被执行,程序就会僵死。(它不会结束,只是坐在那里)。 logData中的第二个while循环是为了确保队列中的所有内容都被记录下来。目标是让Ctrl-C立即停止收集线程,然后允许日志线程完成清空队列的工作,然后才完全终止程序。(现在,数据只是被打印出来 - 最终将被记录到数据库中)。
我不明白为什么第二个线程永远不会终止。我基于这个答案做的:Stopping a thread after a certain amount of time。我错过了什么吗?
def sim_collectData(input_queue, stop_event):
    ''' this provides some output simulating the serial
    data from the data logging hardware. 
    '''
    n = 0
    while not stop_event.is_set():
        input_queue.put("DATA: <here are some random data> " + str(n))
        stop_event.wait(random.randint(0,5))
        n += 1
    print "Terminating data collection..."
    return

def logData(input_queue, stop_event):
    n = 0

    # we *don't* want to loop based on queue size because the queue could
    # theoretically be empty while waiting on some data.
    while not stop_event.is_set():
        d = input_queue.get()
        if d.startswith("DATA:"):
            print d
        input_queue.task_done()
        n += 1

    # if the stop event is recieved and the previous loop terminates, 
    # finish logging the rest of the items in the queue.
    print "Collection terminated. Logging remaining data to database..."
    while not input_queue.empty():
        d = input_queue.get()
        if d.startswith("DATA:"):
            print d
        input_queue.task_done()
        n += 1
    return


def main():
    input_queue = Queue.Queue()

    stop_event = threading.Event() # used to signal termination to the threads

    print "Starting data collection thread...",
    collection_thread = threading.Thread(target=sim_collectData, args=(input_queue,     stop_event))
    collection_thread.start()
    print "Done."

    print "Starting logging thread...",
    logging_thread = threading.Thread(target=logData, args=(input_queue, stop_event))
    logging_thread.start()
    print "Done."

    try:
        while True:
        time.sleep(10)
    except (KeyboardInterrupt, SystemExit):
        # stop data collection. Let the logging thread finish logging everything in the queue
        stop_event.set()

 main()

@tdelaney有最好的解决方案。所有的轮询/超时答案都很差。 - Martin James
你当然可以给 input_queue.get() 添加超时。 - Jan
4个回答

10
问题在于你的记录器正在等待 d = input_queue.get(),并且不会检查事件。一种解决方案是完全跳过事件,并发明一个唯一的消息告诉记录器停止。当你收到信号时,将该消息发送到队列中。
import threading
import Queue
import random
import time

def sim_collectData(input_queue, stop_event):
    ''' this provides some output simulating the serial
    data from the data logging hardware. 
    '''
    n = 0
    while not stop_event.is_set():
        input_queue.put("DATA: <here are some random data> " + str(n))
        stop_event.wait(random.randint(0,5))
        n += 1
    print "Terminating data collection..."
    input_queue.put(None)
    return

def logData(input_queue):
    n = 0

    # we *don't* want to loop based on queue size because the queue could
    # theoretically be empty while waiting on some data.
    while True:
        d = input_queue.get()
        if d is None:
            input_queue.task_done()
            return
        if d.startswith("DATA:"):
            print d
        input_queue.task_done()
        n += 1

def main():
    input_queue = Queue.Queue()

    stop_event = threading.Event() # used to signal termination to the threads

    print "Starting data collection thread...",
    collection_thread = threading.Thread(target=sim_collectData, args=(input_queue,     stop_event))
    collection_thread.start()
    print "Done."

    print "Starting logging thread...",
    logging_thread = threading.Thread(target=logData, args=(input_queue,))
    logging_thread.start()
    print "Done."

    try:
        while True:
            time.sleep(10)
    except (KeyboardInterrupt, SystemExit):
        # stop data collection. Let the logging thread finish logging everything in the queue
        stop_event.set()

main()

请注意,在最后您必须将与阻塞线程数量相同的“None”放入队列中。 - bodo
@canaaerus - 这是一个非常好的观点。在这种情况下只有1个工作线程,但指出需要N个终止消息来处理N个工作线程是一个不错的补充。 - tdelaney

2
我不是线程方面的专家,但在你的logData函数中,第一个d=input_queue.get()是阻塞的。也就是说,如果队列为空,它将永远等待直到接收到队列消息。这很可能是为什么logData线程永远不会终止的原因,它一直在等待队列消息。
请参考[Python文档],将其更改为非阻塞队列读取:使用.get(False).get_nowait() - 但两者都需要一些异常处理来处理队列为空的情况。

1
您正在使用没有超时的阻塞式input_queue调用get。在logData的任何部分中,如果您调用input_queue.get()并且队列为空,则它将无限期地阻塞,从而防止logging_thread完成。
要修复此问题,您需要调用input_queue.get_nowait()或向input_queue.get()传递超时参数。
以下是我的建议:
def logData(input_queue, stop_event):
    n = 0

    while not stop_event.is_set():
        try:
            d = input_queue.get_nowait()
            if d.startswith("DATA:"):
                print "LOG: " + d
                n += 1
        except Queue.Empty:
            time.sleep(1)
    return

你还在发出线程终止的信号,但没有等待它们终止。考虑在你的main函数中执行此操作。
try:
    while True:
        time.sleep(10)
except (KeyboardInterrupt, SystemExit):
    stop_event.set()
    collection_thread.join()
    logging_thread.join()

呃,这里不需要轮询/休眠。 - tdelaney
@tdelaney,你可能是对的,使用get的超时值可能是更好的方式。但那是我随手拼凑出来的,所以就这样吧。 - rrhartjr

0

基于tdelaney的答案,我创建了一个基于迭代器的方法。当遇到终止消息时,迭代器退出。我还添加了一个计数器来记录当前有多少个get调用正在阻塞,以及一个stop方法,该方法发送同样数量的终止消息。为了防止增加和读取计数器之间的竞争条件,我在那里设置了一个停止位。此外,我不使用None作为终止消息,因为在使用PriorityQueue时它不能与其他数据类型进行比较。

我没有消除两个限制。首先,stop方法在关闭线程之前会等待队列为空。第二个限制是,我没有编写任何代码使队列在stop后可重用。后者可能很容易添加,而前者需要注意并发性和代码使用的上下文。

您必须决定是否希望stop也等待所有终止消息被消耗。我选择在那里放置必要的join,但您可以将其删除。

所以这就是代码:

import threading, queue

from functools import total_ordering
@total_ordering
class Final:
    def __repr__(self):
        return "∞"

    def __lt__(self, other):
        return False

    def __eq__(self, other):
        return isinstance(other, Final)

Infty = Final()

class IterQueue(queue.Queue):
    def __init__(self):
        self.lock = threading.Lock()
        self.stopped = False
        self.getters = 0
        super().__init__()

    def __iter__(self):
        return self

    def get(self):
        raise NotImplementedError("This queue may only be used as an iterator.")

    def __next__(self):
        with self.lock:
            if self.stopped:
                raise StopIteration
            self.getters += 1
        data = super().get()
        if data == Infty:
            self.task_done()
            raise StopIteration
        with self.lock:
            self.getters -= 1
        return data

    def stop(self):
        self.join()
        self.stopped = True
        with self.lock:
            for i in range(self.getters):
                self.put(Infty)
        self.join()

class IterPriorityQueue(IterQueue, queue.PriorityQueue):
    pass

哦,我是用 Python 3.2 写的。所以在回溯之后,

import threading, Queue

from functools import total_ordering
@total_ordering
class Final:
    def __repr__(self):
        return "Infinity"

    def __lt__(self, other):
        return False

    def __eq__(self, other):
        return isinstance(other, Final)

Infty = Final()

class IterQueue(Queue.Queue, object):
    def __init__(self):
        self.lock = threading.Lock()
        self.stopped = False
        self.getters = 0
        super(IterQueue, self).__init__()

    def __iter__(self):
        return self

    def get(self):
        raise NotImplementedError("This queue may only be used as an iterator.")

    def next(self):
        with self.lock:
            if self.stopped:
                raise StopIteration
            self.getters += 1
        data = super(IterQueue, self).get()
        if data == Infty:
            self.task_done()
            raise StopIteration
        with self.lock:
            self.getters -= 1
        return data

    def stop(self):
        self.join()
        self.stopped = True
        with self.lock:
            for i in range(self.getters):
                self.put(Infty)
        self.join()

class IterPriorityQueue(IterQueue, Queue.PriorityQueue):
    pass

你可以这样使用它

import random
import time

def sim_collectData(input_queue, stop_event):
    ''' this provides some output simulating the serial
    data from the data logging hardware. 
    '''
    n = 0
    while not stop_event.is_set():
        input_queue.put("DATA: <here are some random data> " + str(n))
        stop_event.wait(random.randint(0,5))
        n += 1
    print "Terminating data collection..."
    return

def logData(input_queue):
    n = 0

    # we *don't* want to loop based on queue size because the queue could
    # theoretically be empty while waiting on some data.
    for d in input_queue:
        if d.startswith("DATA:"):
            print d
        input_queue.task_done()
        n += 1

def main():
    input_queue = IterQueue()

    stop_event = threading.Event() # used to signal termination to the threads

    print "Starting data collection thread...",
    collection_thread = threading.Thread(target=sim_collectData, args=(input_queue,     stop_event))
    collection_thread.start()
    print "Done."

    print "Starting logging thread...",
    logging_thread = threading.Thread(target=logData, args=(input_queue,))
    logging_thread.start()
    print "Done."

    try:
        while True:
            time.sleep(10)
    except (KeyboardInterrupt, SystemExit):
        # stop data collection. Let the logging thread finish logging everything in the queue
        stop_event.set()
        input_queue.stop()

main()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接