如何在Python多进程池中运行清理代码?

8

我有一些Python代码(在Windows上),使用多进程模块运行一个工作进程池。每个工作进程需要在map_async方法结束时进行一些清理。

有谁知道如何做到这一点吗?


2
我来晚了,但你可以参考这个答案来实现。 - dano
2个回答

4
你真的想为每个工作进程运行一次清理函数,而不是为map_async调用创建的每个任务运行一次吗? multiprocess.pool.Pool创建了一个池,例如8个工作进程。 map_async可能会提交40个任务以在8个工作进程之间分配。 我可以想象为什么您可能希望在每个任务结束时运行清理代码,但我很难想象为什么您希望在每个8个工作进程被完成之前运行清理代码。
尽管如此,如果这就是你想做的事情,你可以通过猴子补丁multiprocessing.pool.worker来实现:
import multiprocessing as mp
import multiprocessing.pool as mpool
from multiprocessing.util import debug

def cleanup():
    print('{n} CLEANUP'.format(n=mp.current_process().name))

# This code comes from /usr/lib/python2.6/multiprocessing/pool.py,
# except for the single line at the end which calls cleanup().
def myworker(inqueue, outqueue, initializer=None, initargs=()):
    put = outqueue.put
    get = inqueue.get
    if hasattr(inqueue, '_writer'):
        inqueue._writer.close()
        outqueue._reader.close()

    if initializer is not None:
        initializer(*initargs)

    while 1:
        try:
            task = get()
        except (EOFError, IOError):
            debug('worker got EOFError or IOError -- exiting')
            break

        if task is None:
            debug('worker got sentinel -- exiting')
            break

        job, i, func, args, kwds = task
        try:
            result = (True, func(*args, **kwds))
        except Exception, e:
            result = (False, e)
        put((job, i, result))
    cleanup()

# Here we monkeypatch mpool.worker
mpool.worker=myworker

def foo(i):
    return i*i

def main():
    pool = mp.Pool(8)
    results = pool.map_async(foo, range(40)).get()
    print(results)

if __name__=='__main__':
    main()

yields:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521]
PoolWorker-8 CLEANUP
PoolWorker-3 CLEANUP
PoolWorker-7 CLEANUP
PoolWorker-1 CLEANUP
PoolWorker-6 CLEANUP
PoolWorker-2 CLEANUP
PoolWorker-4 CLEANUP
PoolWorker-5 CLEANUP

嗨,看起来这正是我需要的。我不明白为什么我们在创建池时有一个初始化方法,却没有终结器方法。关于为什么我需要终结器,是因为我启动了一些工作进程,然后会从win32com启动一个Excel会话,并处理不同会话中的所有队列项。我的问题是,当工作进程完成时,它们不会关闭Excel会话。 - Dave
@unutbu: 抱歉,但是我不明白,map() 函数中没有任何初始化或结束过程的方法。传递作为参数的函数只是一项工作。可以做的一件事是添加一个哨兵(比如 None),它会正确地关闭 Excel 会话。 - Dave
@unubtu:感谢您提供的出色答案。我认为这不仅是一种解决方法,而且应该能够解决我的问题。我还认为,如果我想更好地控制工作进程,可能需要花更多时间使用原始进程而不是使用非常方便的池。无论如何,有了您最后的建议,我仍然可以享受池的便利性;) - Dave
@unubtu:pool.map 会确保每个工作进程都运行 shutdown_sessionsstart_sessions 吗? - vin
2
@vin:感谢您指出这一点。您是正确的——如果调用pool.map(shutdown_sessions, ...),则不能保证每个工作进程都会运行shutdown_sessions。相反,似乎运行终结器的最佳方法是使用dano的答案 - unutbu
@unubtu:感谢你指出这个答案,正是我在寻找的! - vin

2

你唯一真正的选择是在你使用map_async函数结尾运行清理操作。

如果这个清理操作是为了在进程结束时进行,那么你不能使用池的概念。它们是相互独立的。除非你使用Python 2.7中新加入的maxtasksperchild,否则池不会决定进程的生命周期。即使你使用了maxtasksperchild,也不能使你在进程结束时运行代码。但是,maxtasksperchild可能适合你,因为进程打开的任何资源都会在进程终止时消失。

话虽如此,如果你有一堆需要运行清理操作的函数,你可以通过设计一个装饰器来避免重复劳动。下面是一个例子:

import functools
import multiprocessing

def cleanup(f):
    """Decorator for shared cleanup mechanism"""
    @functools.wraps(f)
    def wrapped(arg):
        result = f(arg)
        print("Cleaning up after f({0})".format(arg))
        return result
    return wrapped

@cleanup
def task1(arg):
    print("Hello from task1({0})".format(arg))
    return arg * 2

@cleanup
def task2(arg):
    print("Bonjour from task2({0})".format(arg))
    return arg ** 2

def main():
    p = multiprocessing.Pool(processes=3)
    print(p.map(task1, [1, 2, 3]))
    print(p.map(task2, [1, 2, 3]))

if __name__ == "__main__":
    main()

当你执行这个代码时(假设没有混乱的标准输出,因为出于简洁起见我在这里没有锁定它),你得到的顺序应该表明你的清理任务正在每个任务结束时运行:
Hello from task1(1)
Cleaning up after f(1)
Hello from task1(2)
Cleaning up after f(2)
Hello from task1(3)
Cleaning up after f(3)
[2, 4, 6]

Bonjour from task2(1)
Cleaning up after f(1)
Bonjour from task2(2)
Cleaning up after f(2)
Bonjour from task2(3)
Cleaning up after f(3)
[1, 4, 9]

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接