从你的代码中可以观察到,你的consumer
会一直等待从队列中获取某些东西,理想情况下,你应该通过设置一些timeout
并处理相应的Empty
异常来处理这个问题,就像下面这样,理想情况下,这有助于检查每个timeout
的while self.running or not queue.empty()
。
while self.running or not queue.empty():
try:
product = queue.get(timeout=1)
except Empty:
pass
time.sleep(several_seconds)
consume(product)
我模拟了你的情况并创建了生产者
和消费者
线程,以下是样例代码,它正在运行中,有2个生产者
和4个消费者
,它运行得非常好。希望这能帮到你!
import time
import threading
from Queue import Queue, Empty
"""A multi-producer, multi-consumer queue."""
class Producer(threading.Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, verbose=None):
threading.Thread.__init__(self, group=group, target=target, name=name,
verbose=verbose)
self.running = True
self.name = name
self.args = args
self.kwargs = kwargs
def run(self):
out_q = self.kwargs.get('queue')
while self.running:
out_q.put(10)
time.sleep(0.1)
print 'producer {name} terminated\n'.format(name=self.name)
class Consumer(threading.Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, verbose=None):
threading.Thread.__init__(self, group=group, target=target, name=name,
verbose=verbose)
self.args = args
self.kwargs = kwargs
self.producer_alive = True
self.name = name
def run(self):
in_q = self.kwargs.get('queue')
while self.producer_alive or not in_q.empty():
try:
data = in_q.get(timeout=1)
except Empty, e:
pass
if isinstance(data, int):
for i in xrange(data + 10**6):
pass
else:
pass
print 'Consumer {name} terminated (Is producer alive={pstatus}, Is Queue empty={qstatus})!\n'.format(
name=self.name, pstatus=self.producer_alive, qstatus=in_q.empty())
q = Queue()
producer_pool, consumer_pool = [], []
for i in range(1, 3):
producer_worker = Producer(kwargs={'queue': q}, name=str(i))
producer_pool.append(producer_worker)
producer_worker.start()
for i in xrange(1, 5):
consumer_worker = Consumer(kwargs={'queue': q}, name=str(i))
consumer_pool.append(consumer_worker)
consumer_worker.start()
while 1:
control_process = raw_input('> Y/N: ')
if control_process == 'Y':
for producer in producer_pool:
producer.running = False
producer.join()
for consumer in consumer_pool:
consumer.producer_alive = False
break
producer
和consumer
,同时又有一个IO
阻塞器,那么你在这里如何控制你的parent
? - gsb-eng