如何在Python的主线程中终止生产者消费者线程?

3

我有一个生产者和一个消费者线程(threading.Thread),它们共享一个类型为Queue队列

Producer运行:

while self.running:
    product = produced() ### I/O operations
    queue.put(product)

消费者 run:
while self.running or not queue.empty():
    product = queue.get()
    time.sleep(several_seconds) ###
    consume(product)

现在我需要从主线程终止这两个线程,要求在终止前queue必须为空(已全部消耗)。

目前我正在使用以下代码来终止这两个线程:

主线程stop

producer.running = False
producer.join()
consumer.running = False
consumer.join()

但是如果有更多的消费者,我认为这是不安全的。
另外,我不确定 sleep 是否会向生产者释放调度,以便它可以生产更多的产品。实际上,我发现生产者一直处于“饥饿”状态,但我不确定这是否是根本原因。
有没有好的方法来处理这种情况?

你想在什么时候终止producerconsumer,同时又有一个IO阻塞器,那么你在这里如何控制你的parent - gsb-eng
生产者阻塞的I/O操作是否具有超时功能,终止是否可以延迟直到超时发生?或者,这些I/O操作是否连续流式传输,以便您保证更多数据会及时显示? - Patrick Maupin
@gsb-eng 我希望在主线程中有一些事件发生时终止它,比如“按q键退出”。 - Hongxu Chen
@PatrickMaupin 我的生产者会在满足条件时发送请求并将数据放入队列中。它没有超时机制,如果在请求期间出现异常,它会不断重试。适当的延迟是可以接受的;而且生产者不断重试的情况很少,所以我没有太考虑这个问题。 - Hongxu Chen
3个回答

8
您可以在队列中放置一个哨兵对象来表示任务结束,这将导致所有消费者终止:
_sentinel = object()

def producer(queue):
    while running:
       # produce some data
       queue.put(data)
    queue.put(_sentinel)

def consumer(queue):
    while True:
        data = queue.get()
        if data is _sentinel:
            # put it back so that other consumers see it
            queue.put(_sentinel)
            break
        # Process data

这段代码片段是从《Python Cookbook 12.3》中无耻地复制的。

  1. 使用_sentinel标记队列的末尾。如果生产者没有产生None任务,则None也可以工作,但使用_sentinel对于更一般的情况更安全。
  2. 不需要为每个消费者将多个结束标记放入队列中。您可能不知道有多少个线程在消费。当一个消费者发现它时,只需将哨兵放回队列中,以便其他消费者得到信号即可。

我认为你提供了与@Patrick Maupin相似的解决方案;已点赞。 - Hongxu Chen

2
编辑2:
a) 您的消费者花费了这么多时间的原因是因为即使没有数据,您的循环仍然持续运行。
b) 我在底部添加了代码以显示如何处理此问题。
如果我理解正确,生产者/消费者是一个连续的过程,例如,延迟关闭直到退出当前阻塞I/O并处理从中接收到的数据是可以接受的。
在这种情况下,为了有序地关闭您的生产者和消费者,我将从主线程到生产者线程添加通信以调用关闭。 在最一般的情况下,这可以是主线程可以使用的队列来排队“关闭”代码,但在单个生产者要停止而永远不会重新启动的简单情况下,它可以简单地是全局关闭标志。
您的生产者应在其主循环中检查此关闭条件(队列或标志),就在它开始阻塞I/O操作之前(例如,在您完成发送其他数据到消费者队列后)。 如果设置了标志,则应在队列上放置一个特殊的数据结束代码(看起来不像正常数据)以告诉消费者正在发生关闭,然后生产者应返回(终止自身)。
当消费者从队列中取出数据时,应修改消费者以检查此数据结束代码。 如果找到数据结束代码,则应进行有序关闭并返回(终止自身)。
如果有多个消费者,则生产者可以在关闭之前为每个消费者排队多个数据结束消息。 由于消费者在读取消息后停止消费,因此它们最终都会关闭。
或者,如果您不知道有多少个消费者,那么消费者的有序关闭的一部分可能是重新排队数据结束代码。
这将确保所有消费者最终看到数据结束代码并关闭,并且当所有消费者完成时,队列中将仍然有一个剩余项--由最后一个消费者排队的数据结束代码。
编辑:
正确表示您的数据结束代码的方法高度依赖于应用程序,但在许多情况下,简单的 None 非常有效。 因为 None 是一个单例,所以消费者可以使用非常高效的 if data is None 结构来处理结束情况。
在某些情况下甚至更有效的另一种可能性是在主要消费者循环之外设置 try / except ,以这样一种方式,即如果发生except,则是因为您正在尝试以一种始终有效的方式解包数据,除非您正在处理数据结束代码。
编辑2:
将这些概念与您的初始代码相结合,现在生产者执行以下操作:
while self.running:
    product = produced() ### I/O operations
    queue.put(product)
for x in range(number_of_consumers):
    queue.put(None)  # Termination code

每个消费者都会这样做:
while 1:
    product = queue.get()
    if product is None:
        break
    consume(product)

主程序可以这样做:
producer.running = False
producer.join()
for consumer in consumers:
    consumer.join()

2

从你的代码中可以观察到,你的consumer会一直等待从队列中获取某些东西,理想情况下,你应该通过设置一些timeout并处理相应的Empty异常来处理这个问题,就像下面这样,理想情况下,这有助于检查每个timeoutwhile self.running or not queue.empty()

while self.running or not queue.empty():
    try:
        product = queue.get(timeout=1)
    except Empty:
        pass
    time.sleep(several_seconds) ###
    consume(product)

我模拟了你的情况并创建了生产者消费者线程,以下是样例代码,它正在运行中,有2个生产者和4个消费者,它运行得非常好。希望这能帮到你!

import time
import threading

from Queue import Queue, Empty

"""A multi-producer, multi-consumer queue."""

# A thread that produces data
class Producer(threading.Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, verbose=None):
        threading.Thread.__init__(self, group=group, target=target, name=name,
                                  verbose=verbose)
        self.running = True
        self.name = name
        self.args = args
        self.kwargs = kwargs

    def run(self):
        out_q = self.kwargs.get('queue')
        while self.running:
            # Adding some integer
            out_q.put(10)
            # Kepping this thread in sleep not to do many iterations
            time.sleep(0.1)

        print 'producer {name} terminated\n'.format(name=self.name)


# A thread that consumes data
class Consumer(threading.Thread):

    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, verbose=None):
        threading.Thread.__init__(self, group=group, target=target, name=name,
                                  verbose=verbose)
        self.args = args
        self.kwargs = kwargs
        self.producer_alive = True
        self.name = name

    def run(self):
        in_q = self.kwargs.get('queue')

        # Consumer should die one queue is producer si dead and queue is empty.
        while self.producer_alive or not in_q.empty():
            try:
                data = in_q.get(timeout=1)
            except Empty, e:
                pass

            # This part you can do anything to consume time
            if isinstance(data, int):
                # just doing some work, infact you can make this one sleep
                for i in xrange(data + 10**6):
                    pass
            else:
                pass
        print 'Consumer {name} terminated (Is producer alive={pstatus}, Is Queue empty={qstatus})!\n'.format(
            name=self.name, pstatus=self.producer_alive, qstatus=in_q.empty())


# Create the shared queue and launch both thread pools
q = Queue()

producer_pool, consumer_pool = [], []


for i in range(1, 3):
    producer_worker = Producer(kwargs={'queue': q}, name=str(i))
    producer_pool.append(producer_worker)
    producer_worker.start()

for i in xrange(1, 5):
    consumer_worker = Consumer(kwargs={'queue': q}, name=str(i))
    consumer_pool.append(consumer_worker)
    consumer_worker.start()

while 1:
    control_process = raw_input('> Y/N: ')
    if control_process == 'Y':
        for producer in producer_pool:
            producer.running = False
            # Joining this to make sure all the producers die
            producer.join()

        for consumer in consumer_pool:
            # Ideally consumer should stop once producers die
            consumer.producer_alive = False

        break

我猜@Patrick Maupin提供了一个很好的终止方案。但我相信你是对的,我应该为消费者设置超时:-) - Hongxu Chen
@HongxuChen жҲ‘д»Қ然и®ӨдёәеңЁиҝҷз§Қжғ…еҶөдёӢйҖҡиҝҮзӣёеҗҢзҡ„йҳҹеҲ—еҸ‘йҖҒдҝЎеҸ·жҳҜеҘҪзҡ„пјҢдҪҶжҳҜеҰӮжһңжӮЁеҸӘжғіз»Ҳжӯўжҹҗдәӣж¶Ҳиҙ№иҖ…зәҝзЁӢжҖҺд№ҲеҠһпјҹ - gsb-eng
没有考虑到那种情况;抱歉我在多线程方面经验不足。 - Hongxu Chen
1
@HongxuChen 你的方法基本上是正确的,只是当你发现在某些情况下无法通过发送信号来通过队列终止特定消费者时,需要添加timeoutEmpty处理程序。 - gsb-eng
我明白你的想法。因此,为超时和“空”添加异常处理将是更灵活的选择。 - Hongxu Chen

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接