如何获取传递给multiprocessing.Process的函数的返回值?

351
在下面的示例代码中,我想要获取函数worker的返回值。我该如何做到这一点?这个值存储在哪里?
import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

我似乎找不到存储在“jobs”中的对象中的相关属性。
13个回答

345
使用一个共享变量进行通信。例如,像这样, 示例代码:
import multiprocessing


def worker(procnum, return_dict):
    """worker function"""
    print(str(procnum) + " represent!")
    return_dict[procnum] = procnum


if __name__ == "__main__":
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i, return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print(return_dict.values())

输出:

0 represent!
1 represent!
3 represent!
2 represent!
4 represent!
[0, 1, 3, 2, 4]

76
我建议在这里使用 multiprocessing.Queue,而不是使用 Manager。使用 Manager 需要启动一个全新的进程,这在使用 Queue 时有些过度了。 - dano
6
@dano: 我想知道,如果我们使用Queue()对象,我们无法确定每个进程返回值的顺序。我的意思是,如果我们需要结果的顺序来进行下一步工作,我们如何确切地知道哪个输出来自于哪个进程? - Chau Pham
8
你可以从每个进程返回一个元组,其中一个值是你关心的实际返回值,另一个值是来自该进程的唯一标识符。但我也想知道为什么你需要知道哪个进程返回了哪个值。如果这是你实际需要了解的进程信息,还是需要将输入列表和输出列表进行相关联?在这种情况下,我建议使用multiprocessing.Pool.map来处理你的工作项列表。 - dano
26
对于只有一个参数的函数,请使用 args=(my_function_argument, ),注意这里要加上 , 逗号!否则 Python 会抱怨“缺少位置参数”。我花了10分钟才想明白。还要检查手册用法(在“进程类”部分下) 。 - yuqli
6
使用multiprocessing.Manager()字典的缺点之一是它会pickle(序列化)返回的对象,因此存在由pickle库引起的瓶颈,最大返回对象大小为2GiB。是否有其他方法可以避免返回对象的序列化? - hirschme
显示剩余8条评论

96
我认为sega_sai提出的方法是更好的。但是它确实需要一个代码示例,所以我来写一个:
import multiprocessing
from os import getpid

def worker(procnum):
    print('I am number %d in process %d' % (procnum, getpid()))
    return getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print(pool.map(worker, range(5)))

这将打印返回值:
I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

如果你熟悉Python 2中的内置函数map,这应该不会太难。否则,请查看sega_Sai的链接
请注意所需的代码量很少。(还请注意如何重复使用进程。)

2
有什么想法为什么我的 getpid() 返回相同的值?我正在运行Python3。 - zelusp
我不确定 Pool 如何在工作进程之间分配任务。如果它们非常快,它们是否都会最终分配到同一个工作进程?这种情况是否一致发生?另外,如果添加延迟呢? - Mark
1
那我不确定。我认为为此开设一个单独的问题会很有趣。 - Mark
1
如果您想将不同的功能发送到每个进程,请使用“pool.apply_async”:https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult - Kyle
如果您使用此方法,请确保返回值与Pickle兼容。否则,您将会遇到“不可序列化”的错误:https://medium.com/@jwnx/multiprocessing-serialization-in-python-with-pickle-9844f6fa1812 - mrdaliri
显示剩余9条评论

67

对于其他需要使用QueueProcess中获取值的人:

import multiprocessing

ret = {'foo': False}

def worker(queue):
    ret = queue.get()
    ret['foo'] = True
    queue.put(ret)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    queue.put(ret)
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    p.join()
    print(queue.get())  # Prints {"foo": True}

请注意,在Windows或Jupyter Notebook中,使用多线程时,您需要将其保存为文件并执行该文件。如果您在命令提示符中执行此操作,则会看到以下错误:

 AttributeError: Can't get attribute 'worker' on <module '__main__' (built-in)>

2
当我在工作进程中将某物放入队列时,我的join永远不会被触发。你有任何想法这可能是如何发生的吗? - Laurens Koppenol
7
是的,它无限地挂起来。我的工人们都完成了(工作函数内的循环结束后,后面的打印语句会被打印,对于所有工人)。join() 没有起到任何作用。如果我从函数中删除 Queue,就可以让我通过 join() - Laurens Koppenol
命名有点奇怪。顶部的 ret 应该被称为 send,因为你正在将它发送到进程中。虽然只通过 Processargs 参数传递可能更简单?此外,如果你只使用 putget,请使用 SimpleQueue。尽管这段代码在 Py 3.6 上对我卡住了。 - Chris
@MatthewMoisen 感谢你的回答,非常有用,谢谢 - 我将它复制到了我的下面。只是想把外部变量称为 send,这样代码可能会更易读一点,因为它正在被发送,而 ret 遮盖了内部变量 ret,这是一个不同的变量,并且实际上被返回。除非在并行执行期间移动值,否则您还可以向 args=(queue,) 元组添加变量而不是发送。 - Chris
1
@Bendemann 有人编辑了答案,把 queue.get 放在 queue.join 之前,导致答案不正确。我已经通过将 queue.get 放在 p.join 之后来修复它。请再试一次。 - Matthew Moisen
显示剩余9条评论

46

由于某种原因,我在任何地方都找不到使用Queue进行此操作的通用示例(即使Python文档示例也没有生成多个进程),因此这是我尝试了10次后成功的做法:

from multiprocessing import Process, Queue

def add_helper(queue, arg1, arg2): # the func called in child processes
    ret = arg1 + arg2
    queue.put(ret)

def multi_add(): # spawns child processes
    q = Queue()
    processes = []
    rets = []
    for _ in range(0, 100):
        p = Process(target=add_helper, args=(q, 1, 2))
        processes.append(p)
        p.start()
    for p in processes:
        ret = q.get() # will block
        rets.append(ret)
    for p in processes:
        p.join()
    return rets

Queue是一个阻塞的、线程安全的队列,用于存储子进程的返回值。因此您必须将队列传递给每个进程。这里不太明显的是,在加入Process之前,您必须从队列中get()数据,否则队列会填满并阻塞一切。

针对面向对象编程的更新(在Python 3.4中测试):

from multiprocessing import Process, Queue

class Multiprocessor():

    def __init__(self):
        self.processes = []
        self.queue = Queue()

    @staticmethod
    def _wrapper(func, queue, args, kwargs):
        ret = func(*args, **kwargs)
        queue.put(ret)

    def run(self, func, *args, **kwargs):
        args2 = [func, self.queue, args, kwargs]
        p = Process(target=self._wrapper, args=args2)
        self.processes.append(p)
        p.start()

    def wait(self):
        rets = []
        for p in self.processes:
            ret = self.queue.get()
            rets.append(ret)
        for p in self.processes:
            p.join()
        return rets

# tester
if __name__ == "__main__":
    mp = Multiprocessor()
    num_proc = 64
    for _ in range(num_proc): # queue up multiple tasks running `sum`
        mp.run(sum, [1, 2, 3, 4, 5])
    ret = mp.wait() # get all results
    print(ret)
    assert len(ret) == num_proc and all(r == 15 for r in ret)

40
这个例子展示了如何使用一个multiprocessing.Pipe实例的列表来从任意数量的进程中返回字符串。
import multiprocessing

def worker(procnum, send_end):
    '''worker function'''
    result = str(procnum) + ' represent!'
    print result
    send_end.send(result)

def main():
    jobs = []
    pipe_list = []
    for i in range(5):
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=worker, args=(i, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    for proc in jobs:
        proc.join()
    result_list = [x.recv() for x in pipe_list]
    print result_list

if __name__ == '__main__':
    main()

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']

这个解决方案使用的资源比使用multiprocessing.Queue的资源要少

  • 一个管道
  • 至少一个锁
  • 一个缓冲区
  • 一个线程

或者使用multiprocessing.SimpleQueue的资源要少

  • 一个管道
  • 至少一个锁

查看每种类型的源代码非常有教育意义。


我将所有的全局数据和代码放入一个主函数中,它的运行效果是一样的。这回答了你的问题吗? - user3657941
管道中是否必须先读取之前的值,才能添加(发送)新的值? - Nickpick
5
如果返回的对象很大,这个答案会导致死锁。不要先执行proc.join(),而是首先尝试接收(recv())返回值,然后再执行join。 - L. Pes
1
我和@L.Pes的想法一致。这可能是特定于操作系统的问题,但我已经根据我的用例进行了调整,当工作人员尝试使用send_end.send(result)发送大量结果时,程序会无限期地挂起。在接收后加入可以解决这个问题。如果N=2对您来说太具有个人偏见,我很乐意提供一个例子。 - Vlad
管道很棒!它们易于使用和理解 :) - Vaidøtas I.
显示剩余4条评论

18

我有TensorFlow代码,其中multiprocessing.Pool会挂起,但是multiprocessing.Process不会。 - Le Frite

16
你可以使用内置的exit命令来设置进程的退出代码。可以从进程的exitcode属性中获取它。
import multiprocessing

def worker(procnum):
    print str(procnum) + ' represent!'
    exit(procnum)

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    result = []
    for proc in jobs:
        proc.join()
        result.append(proc.exitcode)
    print result

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]

4
请注意,这种方法可能会变得令人困惑。如果处理过程没有错误,通常应该使用退出代码0退出。如果您有任何监控系统进程退出代码的内容,则可能会将其报告为错误。 - ferrouswheel
2
如果您只想在父进程中出现错误时引发异常,则非常完美。 - crizCraig

11

pebble包提供了一个很好的抽象,利用了multiprocessing.Pipe,使得这个过程非常简单:

from pebble import concurrent

@concurrent.process
def function(arg, kwarg=0):
    return arg + kwarg

future = function(1, kwarg=1)

print(future.result())

示例来源:https://pythonhosted.org/Pebble/#concurrent-decorators


11

我来简化一下上面复制的最简单的例子,这些对我在Py3.6上有效。最简单的是multiprocessing.Pool

import multiprocessing
import time

def worker(x):
    time.sleep(1)
    return x

pool = multiprocessing.Pool()
print(pool.map(worker, range(10)))

您可以使用Pool(processes=5)设置进程池中的进程数。但是,它默认为 CPU 数量,因此对于CPU密集型任务,请将其保留为空白。(I/O密集型任务通常适合线程,因为线程大多数时间都在等待,因此可以共享CPU核心。)Pool还应用了分块优化
(请注意,工作方法不能嵌套在方法内部。我最初将我的工作方法定义在调用pool.map的方法内部,以使其全部自包含,但是然后进程无法导入它,并抛出"AttributeError:无法pickle本地对象outer_method..inner_method"。更多信息在这里。它可以在类内部。)
(感谢原问题指定打印'represent!'而不是time.sleep(),但没有它,我认为某些代码正在并发运行,但实际上并没有。)
Py3的ProcessPoolExecutor也只需要两行代码(.map返回一个生成器,因此您需要list())。
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
    print(list(executor.map(worker, range(10))))

使用普通的Process

import multiprocessing
import time

def worker(x, queue):
    time.sleep(1)
    queue.put(x)

queue = multiprocessing.SimpleQueue()
tasks = range(10)

for task in tasks:
    multiprocessing.Process(target=worker, args=(task, queue,)).start()

for _ in tasks:
    print(queue.get())

如果你只需要使用putget,那么请使用SimpleQueue。第一个循环启动所有进程,在第二个循环中进行阻塞的queue.get调用。我认为没有必要调用p.join()


2
您可以使用 ProcessPoolExecutor 来从函数中获取返回值,如下所示:
from concurrent.futures import ProcessPoolExecutor

def test(num1, num2):
    return num1 + num2

with ProcessPoolExecutor() as executor:
    feature = executor.submit(test, 2, 3)
    print(feature.result()) # 5

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接