Dask: 如果一个任务失败,如何继续其他任务

3
我有一个简单(但很大)的任务图在Dask中。以下是代码示例。
results = []

for params in SomeIterable:
    a = dask.delayed(my_function)(**params)
    b = dask.delayed(my_other_function)(a)
    results.append(b)

dask.compute(**results)



在这里,SomeIterable 是一个 dict 列表,其中每个都是传递给 my_function 的参数。在每次迭代中,b 依赖于 a,因此如果生成 a 的任务失败,则无法计算 b。但是,results 的每个元素都是独立的,因此我希望如果其中一项失败,另一项可以继续运行。实际情况并非如此,如果 results 的一个元素失败,则脚本的执行会结束。

编辑:

使用客户端类 dask.distributed.Clientsubmit (或map)方法也会发生这种情况,例如

futures = [client.submit(my_other_function_2, **params) for params in MyOtherIterable]
results = [ft.result() for ft in futures]

在上面的代码中,如果我尝试收集结果时有一个任务失败,所有代码都会失败,就像docs中所述。

你不想使用 client.submit API 有具体的原因吗?(据我所知,在底层它们是等效的) - SultanOrazbayev
1个回答

2

一个简单的解决方法是在您的函数中使用try/except,就像这样:

def try_f(params):
    try:
        a = my_function(**params)
        b = my_other_function(a)
    except:
        b = f"Failed for: {params}"
    return b

results = [dask.delayed(try_f)(params) for params in SomeIterable]
computed = dask.compute(results)

但是,根据您的情况,您可能想使用client.submit API,因为它会给您一些进一步的灵活性,例如指定一些条件重试。


1
谢谢,最后我就是这么做的哈哈。 - Andrex
1
如果那对您可行,@Andrex,那么您应该接受它 :-) - Wes Hardaker

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接