如何在Python中终止异步starmap多进程池,一旦任何进程遇到错误?

3

我正在使用Python的multiprocessing库中的starmap_async函数。 但是,如果我的代码在某个进程中遇到错误,则不会抛出异常,直到所有进程都完成。以下是相关的代码:

from multiprocessing import Pool, cpu_count
import datetime
import itertools
import time

       with Pool(max(cpu_count()//2, 1)) as p:
            #p = Pool()
            df_iter = df_options.iterrows()
            ir = itertools.repeat
            results = p.starmap_async(_run,zip(df_iter,ir(fixed_options),ir(outputs_grab)), chunksize=1)
            p.close() #no more jobs to submit
            
            #Printing progress
            
            n_remaining = results._number_left + 1
            while (not results.ready()):
                time.sleep(1)
                #Check for errors here ... How ????
                #Then what? call terminate()????? 
                if verbose:
                   if results._number_left < n_remaining:
                      now = datetime.datetime.now()
                      n_remaining = results._number_left
                      print('%d/%d  %s' % (n_remaining,n_rows,str(now)[11:]))
                    
            print('joining')
            p.join()
            
            all_results = results.get()
            
        df = pd.DataFrame(all_results)

目前,如果我在生成的进程中引发错误,似乎其他进程不仅会完成运行,而且还会开始新任务,尽管其中一个调用出现错误。

一些搜索使我相信这可能是不可能的。有人似乎建议我改用 concurrent.futures,尽管如何将我的示例映射到该示例仍不清楚,特别是在保持实时反馈完成进程的同时。

concurrent.futures 的讨论: https://dev59.com/9mw15IYBdhLWcg3wQ5di#47108581

1个回答

2
简而言之,使用imap_unordered可以使主进程在子进程抛出异常时具有最小的延迟,因为它允许您通过结果Queue立即在主进程中处理结果。如果需要,您可以使用包装函数构建自己的“星形”版本的函数。作为代码设计的一个要点,大多数Pool方法倾向于重新引发来自子进程的异常,而concurrent.futures则倾向于设置返回值的属性以指示引发的异常。
from random import random
from functools import partial
from multiprocessing import Pool
from time import sleep

def foo(a, b):
    sleep(random()) #introduce some processing delay to simulate work
    if random() > .95:
        raise Exception("randomly rasied an exception")
    else:
        return f"{a}\t{b}"

def star_helper(func, args):
    return func(*args)

if __name__ == "__main__":
    n = 20
    print("chance of early termination:", (1-.95**n)*100, "%")
    with Pool() as p:
        try:
            for result in p.imap_unordered(partial(star_helper, foo), zip(range(n), range(n))):
                print(result)
        except:
            p.terminate()
            print("terminated")
    print("done") # `with Pool()` joins the child processes to prove they quit early

只是补充一下,如果你选择这条路线,那么结果将不会被排序。如果你想保留顺序,请使用 pool.apply_async 逐个发送它们(无阻塞),这样你就可以像 pool.imap_unordered 一样跟踪结果的到来。 - Charchit Agarwal
我特别使用了imap_unordered来最小化延迟(主进程不等待正确的有序输出来生成下一个循环迭代),但常规的imap也可以保持顺序,并基于chunksize和工作器数量给出给定的最小延迟。 - Aaron
1
非常好!我做了两个小修改。在终止后,我会引发最后一个错误,这是我不得不查找的,只需使用raise即可。我还添加了一个枚举,以便在进度进行时可以获得计数器以打印进度。谢谢! - Jimbo

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接