来自joblib的中间结果

5

我正在尝试学习joblib模块,作为Python内置的multiprocessing模块的替代品。我通常使用multiprocessing.imap在可迭代对象上运行函数,并在结果返回时返回结果。在这个最小工作示例中,我无法弄清如何使用joblib:

import joblib, time

def hello(n):
    time.sleep(1)
    print "Inside function", n
    return n

with joblib.Parallel(n_jobs=1) as MP:

    func = joblib.delayed(hello)
    for x in MP(func(x) for x in range(3)):
        print "Outside function", x

这将打印:

Inside function 0
Inside function 1
Inside function 2
Outside function 0
Outside function 1
Outside function 2

我希望看到输出结果:

Inside function 0
Outside function 0
Inside function 1
Outside function 1
Inside function 2
Outside function 2

或者类似的内容,表明可迭代对象MP(...)不会等待所有结果完成。对于更长的演示,请将n_jobs=-1range(100)进行更改。

3个回答

6

stovfl的回答很优雅,但它只适用于派遣的第一批任务。在这个例子中,它有效是因为工人们从未挨饿(n_tasks < 2*n_jobs)。要使这种方法有效,必须还调用最初传递给apply_async的回调函数。这是BatchCompletionCallBack的一个实例,用于安排下一批要处理的任务。

一种可能的解决方案是将任意回调封装在可调用对象中,像这样(在joblib==0.11、py36中测试通过):

from joblib._parallel_backends import MultiprocessingBackend
from joblib import register_parallel_backend, parallel_backend
from joblib import Parallel, delayed
import time

class MultiCallback:
    def __init__(self, *callbacks):
        self.callbacks = [cb for cb in callbacks if cb]

    def __call__(self, out):
        for cb in self.callbacks:
            cb(out)

class ImmediateResultBackend(MultiprocessingBackend):
    def callback(self, result):
        print("\tImmediateResult function %s" % result)

    def apply_async(self, func, callback=None):
        cbs = MultiCallback(callback, self.callback)
        return super().apply_async(func, cbs)

register_parallel_backend('custom', ImmediateResultBackend)

def hello(n):
    time.sleep(1)
    print("Inside function", n)
    return n

with parallel_backend('custom'):
    res = Parallel(n_jobs=2)(delayed(hello)(y) for y in range(6))

输出

Inside function 0
Inside function 1
    ImmediateResult function [0]
    ImmediateResult function [1]
Inside function 3
Inside function 2
    ImmediateResult function [3]
    ImmediateResult function [2]
Inside function 4
    ImmediateResult function [4]
Inside function 5
    ImmediateResult function [5]

非常好用,谢谢!顺便说一下:callback 中的结果将是一个结果列表。此外,如果您想捕获当前活动的后端,可以让 ImmediateResultBackend 继承自 type(joblib.parallel.get_active_backend()[0])。像这样:class ImmediateResultBackend(type(joblib.parallel.get_active_backend()[0])): ...。最后,可能有意义的是使用 del joblib.parallel.BACKENDS["custom"] 取消注册后端。 - Martin Becker

3
为了立即获得joblib的结果,例如:
from joblib._parallel_backends import MultiprocessingBackend

class ImmediateResult_Backend(MultiprocessingBackend):
    def callback(self, result):
        print("\tImmediateResult function %s" % (result))

    # Overload apply_async and set callback=self.callback
    def apply_async(self, func, callback=None):
        applyResult = super().apply_async(func, self.callback)
        return applyResult

joblib.register_parallel_backend('custom', ImmediateResult_Backend, make_default=True)

with joblib.Parallel(n_jobs=2) as parallel:
    func = parallel(delayed(hello)(y) for y in range(3))
    for f in func:
        print("Outside function %s" % (f))

输出
注意:在def hello(...)中使用了time.sleep(n * random.randrange(1,5)),因此processes变成了不同的就绪状态。

函数内部 0
函数内部 1
立即结果函数 [0]
函数内部 2
立即结果函数 [1]
立即结果函数 [2]
函数外部 0
函数外部 1
函数外部 2

使用 Python:3.4.2 - joblib:0.11 进行测试


1
太好了,谢谢!我没想到你可以钩入一个新的函数,我不知道该怎么称呼它。需要注意的是(在调查后):“警告:这个函数是实验性的,在未来的joblib版本中可能会发生变化。” - Hooked
@Hooked: change 不意味着在未来版本中 删除。此外,我还可以使用 队列 而不是 hook 来获得即时结果。 - stovfl

-1
>>> import joblib, time
>>> 
>>> def hello(n):
...     time.sleep(1)
...     print "Inside function", n
...     return n
... 
>>> with joblib.Parallel(n_jobs=1) as MP:
...     func = joblib.delayed(hello)
...     res = MP(func(x) for x in range(3))  # This is not an iterator.
... 
Inside function 0
Inside function 1
Inside function 2
>>> type(res)
<type 'list'>

你所处理的不是一个生成器。因此,你不应该期望它会提供中间结果。我在文档中读到的没有提到其他情况(或者我没有阅读相关部分)。
欢迎你阅读文档并搜索“intermediate”结果主题: https://pythonhosted.org/joblib/search.html?q=intermediate&check_keywords=yes&area=default 我的理解是每次调用parallel都是一个障碍,为了获得中间结果,你需要对处理进行分块:
>>> import joblib, time
>>> 
>>> def hello(n):
...     time.sleep(1)
...     print "Inside function", n
...     return n
... 
>>> with joblib.Parallel(n_jobs=1) as MP:
...     func = joblib.delayed(hello)
...     for chunk in range(3):
...         x = MP(func(y) for y in [chunk])
...         print "Outside function", x
... 
Inside function 0
Outside function [0]
Inside function 1
Outside function [1]
Inside function 2
Outside function [2]
>>> 

如果您想深入了解,这里有一个回调机制,但它仅用于进度报告(BatchCompletionCallBack),但您需要进行更多的代码更改。

我不确定你的代码块与我的有何不同--你所做的只是将赋值res移到了循环语句中并添加了注释。问题仍然存在,我正在寻找一种方法让joblib返回中间结果。你的答案输出与预期输出不匹配,实际上它返回了我所确定的问题! - Hooked
我说过“你的代码等价于”,这是非常清楚的含义。我已经将它改为片段,以使其更清晰易懂。 - dnozay
所以明确一点,使用joblib而不先计算整个结果集是不可能的吗?我理解我们的代码块会生成一个列表,在显示之前必须完成该列表,但可以使用joblib来完成此操作吗?multiprocessing.imap等效于什么?[如果没有别的,这将有助于澄清我的问题] - Hooked

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接