使用更新队列和输出队列的Python多进程技术

8
我该如何编写一个Python多进程脚本,使用以下两个队列?
1. 一个作为工作队列,它从一些数据开始,并根据要并行化的函数的条件,在运行时接收更多任务。 2. 另一个用于收集结果,并在处理完成后将结果写入其中。
基本上,我需要根据初始项中发现的内容向工作队列中添加更多任务。下面我发布的示例很愚蠢(我可以按照自己的意愿转换项目并直接放入输出队列),但其机制是清晰的,并反映了我需要开发的部分概念。这是我的尝试:
import multiprocessing as mp

def worker(working_queue, output_queue):
    item = working_queue.get() #I take an item from the working queue
    if item % 2 == 0:
        output_queue.put(item**2) # If I like it, I do something with it and conserve the result.
    else:
        working_queue.put(item+1) # If there is something missing, I do something with it and leave the result in the working queue 

if __name__ == '__main__':
    static_input = range(100)    
    working_q = mp.Queue()
    output_q = mp.Queue()
    for i in static_input:
        working_q.put(i)
    processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())] #I am running as many processes as CPU my machine has (is this wise?).
    for proc in processes:
        proc.start()
    for proc in processes:
        proc.join()
    for result in iter(output_q.get, None):
        print result #alternatively, I would like to (c)pickle.dump this, but I am not sure if it is possible.

这不会结束也不会打印任何结果。

在整个过程结束时,我希望确保工作队列为空,并且所有并行函数在后面迭代输出队列以取出结果之前都已经完成写入。你有什么建议可以使其正常工作吗?

2个回答

5
以下代码实现了预期结果。它遵循@tawmas所提出的建议。
该代码允许在需要工作进程可以在处理过程中更新将数据提供给工作者的队列的情况下使用多个核心。
import multiprocessing as mp
def worker(working_queue, output_queue):
    while True:
        if working_queue.empty() == True:
            break #this is the so-called 'poison pill'    
        else:
            picked = working_queue.get()
            if picked % 2 == 0: 
                    output_queue.put(picked)
            else:
                working_queue.put(picked+1)
    return

if __name__ == '__main__':
    static_input = xrange(100)    
    working_q = mp.Queue()
    output_q = mp.Queue()
    results_bank = []
    for i in static_input:
        working_q.put(i)
    processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())]
    for proc in processes:
        proc.start()
    for proc in processes:
        proc.join()
    results_bank = []
    while True:
       if output_q.empty() == True:
           break
       results_bank.append(output_q.get_nowait())
    print len(results_bank) # length of this list should be equal to static_input, which is the range used to populate the input queue. In other words, this tells whether all the items placed for processing were actually processed.
    results_bank.sort()
    print results_bank

当结果队列为空时,您的打印循环会一直等待而无法退出。您应该使用get_nowait并明确捕获Empty异常以实现清洁退出。 - tawmas
再次感谢您的帮助。我正在尝试以下代码:try: print result except Empty: break这将打印预期的总结果,但控制台输出仍在抱怨异常。我认为我没有正确处理它。 - Jaqo
你需要在try块内从队列中获取。 - tawmas
亲爱的@tawmas,您的帮助非常有用。再次感谢。我无法指定异常。相反,我使用了一个while True循环,并在队列为空时进行了检查。这应该是可靠的,因为只有一个cpu核心执行此任务。我已经使用您找到的解决方案编辑了上面答案中的代码。 - Jaqo

3
在创建进程的行中,您有一个错别字。应该是mp.Process而不是mp.process。这就是导致您遇到异常的原因。
另外,您没有在工作进程中循环,因此它们实际上只从队列中消耗了一个项,然后退出。如果不了解所需逻辑的更多信息,很难给出具体建议,但您可能需要将worker函数的主体放在while True循环中,并在主体中添加一个条件以在完成工作时退出。
请注意,如果您没有添加明确退出循环的条件,则当队列为空时,您的工作进程将永远停滞。您可以考虑使用所谓的“毒丸”技术来通知工作进程可以退出。您将在PyMOTW文章Communication Between processes中找到一个示例和一些有用的讨论。
至于要使用的进程数,您需要进行一些基准测试以找到适合您的数量,但通常情况下,当您的工作负载为CPU绑定时,每个核心一个进程是一个好的起点。如果您的工作负载是IO绑定的,则使用更多的工作进程可能会获得更好的结果。

不客气!请注意,在您进行操作时,我已经编辑了我的回复以开始回答您问题的其余部分。 - tawmas
我刚刚读了你回答的剩余部分。我将尝试应用while True循环。如果队列中没有更多要处理的项目,这个过程是否会结束呢?我想使用类似于队列长度的东西,但文档说明这不是可靠的。 - Jaqo
给你一些更多的细节! - tawmas
我正在尝试将所谓的“毒丸”放置在JoinableQueue的初始任务末尾。但是随着这个队列在多进程中获取更多任务,毒丸会出现在一些必须被处理的任务之前。我是否应该检查“锁定”,即使这可能影响性能? - Jaqo
1
实际上,我的工人们根据他们在工作中发现的情况排队更多的任务。我尝试了您关于使每个工人在没有更多工作可用时发布毒丸的建议。我只使用了不可靠的.empty()方法;在最坏的情况下,我会得到一个单核心在其他工人未能完成的任务末尾工作的结果。我打印出了我的结果,但控制台中的"In"行不再出现。你知道为什么吗?我更新了代码并将您的代码设置为正确答案。非常感谢您的帮助。 - Jaqo
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接