在多进程池的map_async()函数中处理multiprocessing.TimeoutError异常。

5

到目前为止,我做的是这样的:

rets=set(pool.map_async(my_callback, args.hosts).get(60*4))

如果超时,我会收到一个异常:

 File "/usr/lib/python2.7/multiprocessing/pool.py", line 524, in get
    raise TimeoutError
multiprocessing.TimeoutError

我希望优雅地处理这个问题:

对于我可以访问的所有主机,输出应该进入rets,所有超时的主机应该进入另一个列表。

如何做到这一点?

更新

六年后,我认为在并发应用程序中使用go比使用Python更有意义。


try-except显然是解决方案,那么我想知道有什么陷阱? - John Mee
“我可以到达的主机”放在“尝试(try)”中,“所有超时的主机”放在“TimeoutError异常”中……这里有什么难理解的? - NoobEditor
@JohnMee 你说try-except是解决方案。但是怎么做呢?如果我的my_callback()被并行调用了100次,代码中有一行:调用map_async()。据我所知,如果出现异常,rets将为空。如何获取两个列表:未超时调用的结果列表和超时主机列表? - guettli
啊,我明白了,“问题”在于所有的操作都发生在一行代码中。你需要查阅文档,并想办法将其拆分成多行代码,这样你就可以通过一个语句调用一个进程,并且只能捕获该进程的超时异常,而不是其他进程。 - John Mee
1个回答

9
据我所知,你不能使用map_async,或者至少不能完全使用。 map_async是一个方便的方法,用于解决特定用例的特定问题,而这与你想要的不匹配,因为你需要更细致的控制。
然而,你仍然可以做到,只需使用多进程模块中更细粒度的方法即可。特别是,你可以使用apply_async动态地向池添加作业,这样可以更好地控制如何处理单个任务的成功和失败。
下面是一个相当简单的示例,我认为它可以实现你想要的功能:
from multiprocessing.pool import Pool, TimeoutError
from time import sleep, time


def task_function(xx):
    print('Task %d running' % xx)
    sleep(xx)
    print('Task %d ended' % xx)
    return 'Result of task %d' % xx

pl = Pool()
results = [
    pl.apply_async(task_function, (_xx,)) 
    for _xx in range(10)]

start = time()
wait_until = start + 5

rets = []
timed_out_results = []

for res in results:
    timeout = wait_until - time()
    if timeout < 0:
        timeout = 0

    try:
        rets.append(res.get(timeout))
    except TimeoutError:
        timed_out_results.append(res)

print('%s ended' % (rets,))
print('%s timedout' % (timed_out_results,))

这里有10个作业,每个作业都会先打印一行文字,然后sleep,再打印另一行文字。第一个作业sleep 0秒,第二个作业sleep 1秒,第三个作业sleep 2秒,以此类推。我们设置池子在5秒后超时,因此我们期望有5个任务完成且5个任务超时。
请注意,我并没有停止仍在运行的任务,在现实世界中它们可能会继续运行并在打印结果所需的时间内完成。您需要确定对此有多在意并决定如何处理。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接