Python多进程管道“死锁”问题

4
我遇到了以下示例代码的问题:
from multiprocessing import Lock, Process, Queue, current_process

def worker(work_queue, done_queue):
    for item in iter(work_queue.get, 'STOP'):
            print("adding ", item, "to done queue")
            #this works: done_queue.put(item*10)
            done_queue.put(item*1000) #this doesnt!
    return True

def main():
    workers = 4
    work_queue = Queue()
    done_queue = Queue()
    processes = []

    for x in range(10):
        work_queue.put("hi"+str(x))

    for w in range(workers):
        p = Process(target=worker, args=(work_queue, done_queue))
        p.start()
        processes.append(p)
        work_queue.put('STOP')

    for p in processes:
        p.join()

    done_queue.put('STOP')

    for item in iter(done_queue.get, 'STOP'):
        print(item)


if __name__ == '__main__':
    main()

当完成队列变得足够大时(我认为限制约为64k),整个程序会在没有任何进一步通知的情况下冻结。

当队列变得过大时,一般的方法是什么?是否有一种方法可以在处理完元素后即时将其删除?Python文档建议删除 p.join(),然而在实际应用中,我无法估计进程何时完成。除了无限循环和使用 .get_nowait() 之外,是否有简单的解决方案来解决这个问题?


这对我在CPython 2.6、2.7、3.0、3.1、3.2、3.3和3.4alpha4上都有效。2.5不包括多进程模块。你使用的Python版本是什么? - dstromberg
我正在使用3.3版本。尝试将数字从1000增加到更高的值,管道大小限制取决于操作系统。 - Stefan
2
你在文档中看到过 "这意味着每当你使用队列时,你需要确保在进程加入之前所有已经放入队列的项目最终都会被移除。" 吗?甚至有一个例子代码会导致死锁。在调用 p.join() 之前,done_queue 必须为空。移除 p.join()。在工作器中添加 try: ... finally: done_queue.put('STOP') 并重复执行 iter(done_queue.get, 'STOP') 循环 len(processes) 次。 - jfs
使用 range(len(processes)+1) 时似乎能够工作,谢谢。 - Stefan
1
@Stefan:你应该在主进程中放弃 done_queue.put('STOP'),然后 len(processes) 次就足够了。顺便问一下,为什么不在这种情况下使用 Pool 呢?(https://gist.github.com/fc0ad934e903a5fac5ae) - jfs
1个回答

2

这对我来说在3.4.0alpha4、3.3、3.2、3.1和2.6上工作正常。但在2.7和3.0上会出现回溯(traceback)的情况。顺便一提,我已经使用pylint检查过了。

#!/usr/local/cpython-3.3/bin/python

'''SSCCE for a queue deadlock'''

import sys
import multiprocessing

def worker(workerno, work_queue, done_queue):
    '''Worker function'''
    #reps = 10 # this worked for the OP
    #reps = 1000 # this worked for me
    reps = 10000 # this didn't

    for item in iter(work_queue.get, 'STOP'):
        print("adding", item, "to done queue")
        #this works: done_queue.put(item*10)
        for thing in item * reps:
            #print('workerno: {}, adding thing {}'.format(workerno, thing))
            done_queue.put(thing)
    done_queue.put('STOP')
    print('workerno: {0}, exited loop'.format(workerno))
    return True

def main():
    '''main function'''
    workers = 4
    work_queue = multiprocessing.Queue(maxsize=0)
    done_queue = multiprocessing.Queue(maxsize=0)
    processes = []

    for integer in range(10):
        work_queue.put("hi"+str(integer))

    for workerno in range(workers):
        dummy = workerno
        process = multiprocessing.Process(target=worker, args=(workerno, work_queue, done_queue))
        process.start()
        processes.append(process)
        work_queue.put('STOP')

    itemno = 0
    stops = 0
    while True:
        item = done_queue.get()
        itemno += 1
        sys.stdout.write('itemno {0}\r'.format(itemno))
        if item == 'STOP':
            stops += 1
            if stops == workers:
                break
    print('exited done_queue empty loop')


    for workerno, process in enumerate(processes):
        print('attempting process.join() of workerno {0}'.format(workerno))
        process.join()

    done_queue.put('STOP')

if __name__ == '__main__':
    main()

HTH


谢谢您的回答,但是在研究了一下池之后,它似乎是这个问题的更简单的解决方案。 - Stefan

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接