我遇到了以下示例代码的问题:
from multiprocessing import Lock, Process, Queue, current_process
def worker(work_queue, done_queue):
for item in iter(work_queue.get, 'STOP'):
print("adding ", item, "to done queue")
#this works: done_queue.put(item*10)
done_queue.put(item*1000) #this doesnt!
return True
def main():
workers = 4
work_queue = Queue()
done_queue = Queue()
processes = []
for x in range(10):
work_queue.put("hi"+str(x))
for w in range(workers):
p = Process(target=worker, args=(work_queue, done_queue))
p.start()
processes.append(p)
work_queue.put('STOP')
for p in processes:
p.join()
done_queue.put('STOP')
for item in iter(done_queue.get, 'STOP'):
print(item)
if __name__ == '__main__':
main()
当完成队列变得足够大时(我认为限制约为64k),整个程序会在没有任何进一步通知的情况下冻结。
当队列变得过大时,一般的方法是什么?是否有一种方法可以在处理完元素后即时将其删除?Python文档建议删除 p.join(),然而在实际应用中,我无法估计进程何时完成。除了无限循环和使用 .get_nowait() 之外,是否有简单的解决方案来解决这个问题?
p.join()
之前,done_queue
必须为空。移除p.join()
。在工作器中添加try: ... finally: done_queue.put('STOP')
并重复执行iter(done_queue.get, 'STOP')
循环len(processes)
次。 - jfsdone_queue.put('STOP')
,然后len(processes)
次就足够了。顺便问一下,为什么不在这种情况下使用Pool
呢?(https://gist.github.com/fc0ad934e903a5fac5ae) - jfs