Python多进程池重试

13

如果原始计算失败,是否有一种使用简单的池来重新发送数据进行处理的方法?

import random
from multiprocessing import Pool

def f(x):
   if random.getrandbits(1):
       raise ValueError("Retry this computation")
   return x*x

p = Pool(5)
# If one of these f(x) calls fails, retry it with another (or same) process
p.map(f, [1,2,3])

1
也许你想要 return f(x) 而不是引发 ValueError?只是猜测... - Paulo Freitas
你实际应用中出现失败的几率有多高?也就是说,进程立即重试与等待其他进程先完成相比,哪一个更重要? - Isaac
这是一个失败的中等几率,不需要立即重试(但应该最终并行重试)。 - atp
2个回答

23

如果你能(或者不介意)立即重试,可以使用一个装饰器来包装这个函数:

import random
from multiprocessing import Pool
from functools import wraps

def retry(f):
    @wraps(f)
    def wrapped(*args, **kwargs):
        while True:
            try:
                return f(*args, **kwargs)
            except ValueError:
                pass
    return wrapped

@retry
def f(x):
    if random.getrandbits(1):
        raise ValueError("Retry this computation")
    return x*x

p = Pool(5)
# If one of these f(x) calls fails, retry it with another (or same) process
p.map(f, [1,2,3])

10

你可以使用一个队列将失败反馈到初始化进程中的循环中,以便将其传递回中:

import multiprocessing as mp
import random

def f(x):
    if random.getrandbits(1):
        # on failure / exception catch
        f.q.put(x)
        return None
    return x*x

def f_init(q):
    f.q = q

def main(pending):
    total_items = len(pending)
    successful = []
    failure_tracker = []

    q = mp.Queue()
    p = mp.Pool(None, f_init, [q])
    results = p.imap(f, pending)
    retry_results = []
    while len(successful) < total_items:
        successful.extend([r for r in results if not r is None])
        successful.extend([r for r in retry_results if not r is None])
        failed_items = []
        while not q.empty():
            failed_items.append(q.get())
        if failed_items:
            failure_tracker.append(failed_items)
            retry_results = p.imap(f, failed_items);
    p.close()
    p.join()

    print "Results: %s" % successful
    print "Failures: %s" % failure_tracker

if __name__ == '__main__':
    main(range(1, 10))

输出结果如下:

Results: [1, 4, 36, 49, 25, 81, 16, 64, 9]
Failures: [[3, 4, 5, 8, 9], [3, 8, 4], [8, 3], []]

一个Pool不能在多个进程之间共享。因此采用这种基于Queue的方法。如果你尝试将一个池作为参数传递给池进程,你会得到这个错误:

NotImplementedError: pool objects cannot be passed between processes or pickled

为了避免同步开销,你可以尝试在函数f内进行几次立即重试。这主要取决于你的函数需要等待多久才重试,以及如果立即重试成功的可能性有多大。


旧答案:为了完整起见,这是我的旧答案,它不像直接重新提交到池中那样优化,但根据用例可能仍然相关,因为它提供了一种处理/限制n-级重试的自然方法:

你可以使用一个Queue来聚合失败并在每次运行结束时重新提交,跨多个运行:

import multiprocessing as mp
import random


def f(x):
    if random.getrandbits(1):
        # on failure / exception catch
        f.q.put(x)
        return None
    return x*x

def f_init(q):
    f.q = q

def main(pending):
    run_number = 1
    while pending:
        jobs = pending
        pending = []

        q = mp.Queue()
        p = mp.Pool(None, f_init, [q])
        results = p.imap(f, jobs)
        p.close()

        p.join()
        failed_items = []
        while not q.empty():
            failed_items.append(q.get())
        successful = [r for r in results if not r is None]
        print "(%d) Succeeded: %s" % (run_number, successful)
        print "(%d) Failed:    %s" % (run_number, failed_items)
        print
        pending = failed_items
        run_number += 1

if __name__ == '__main__':
    main(range(1, 10))

输出结果如下:

(1) Succeeded: [9, 16, 36, 81]
(1) Failed:    [2, 1, 5, 7, 8]

(2) Succeeded: [64]
(2) Failed:    [2, 1, 5, 7]

(3) Succeeded: [1, 25]
(3) Failed:    [2, 7]

(4) Succeeded: [49]
(4) Failed:    [2]

(5) Succeeded: [4]
(5) Failed:    []

我更新了我的答案,现在不需要多次运行,并且可以在同一个原始池上工作。 - Preet Kukreti
感谢详细的回复。我喜欢将失败的计算放入队列中进行重试的想法。我必须授予 Andrew 奖励,因为他的解决方案进行了简单的重试。 - atp
@ash 我在我的回复中提到了立即重试,认为这将是一个微不足道/简单的添加,而不是你要寻找的内容。请注意,它(立即重试)并不适用于所有情况,特别是那些立即重试成功的机会很低的情况(在这种情况下,它非常不优秀,因为它会导致潜在成功的作业资源匮乏)。无论如何,祝贺安德鲁。 - Preet Kukreti

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接