在concurrent.futures.ProcessPool中将代码提交给所有进程以供执行

8

背景:

  • 一款使用concurrent.futures.process.ProcessPool来执行代码的Python应用程序服务器
  • 我们有时希望热重载已导入的代码,而不必重新启动整个服务器进程

(是的,我知道importlib.reload注意事项)

要使其正常工作,我想必须在由进程池管理的每个multiprocessing进程中执行importlib.reload

是否有一种方法可以向所有进程提交某些内容?

1个回答

5

我不知道你提到的热重新加载尝试会如何运作,但你真正询问的问题是可以回答的。

有没有一种方法可以将某个东西提交给进程池中的所有进程?

这里的挑战在于确保所有进程都得到了这个something,且只得到一次,并且在每个进程都得到之前不进行任何进一步的执行。

你可以使用 multiprocessing.Barrier(parties[, action[, timeout]]) 来获得这种必要的同步。障碍将阻止调用 barrier.wait() 的参与方,直到每个参与方都这样做,然后一次性释放它们所有人。

import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor


def foo(x):
    for _ in range(int(42e4)):
        pass
    return x


def reload(something):
    print(f"{mp.current_process().name} --- reloading {something} and waiting.")
    barrier.wait()
    print(f"{mp.current_process().name} --- released.")


def init_barrier(barrier):
    globals()['barrier'] = barrier


if __name__ == '__main__':

    MAX_WORKERS = 4
    barrier = mp.Barrier(MAX_WORKERS)

    with ProcessPoolExecutor(
            MAX_WORKERS, initializer=init_barrier, initargs=(barrier,)
    ) as executor:
        print(list(executor.map(foo, range(10))))
        # then something for all processes
        futures = [executor.submit(reload, "something") for _ in range(MAX_WORKERS)]
        for f in futures:
            f.result()

        print(list(executor.map(foo, range(10))))

例子输出:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
ForkProcess-3 --- reloading something and waiting.
ForkProcess-2 --- reloading something and waiting.
ForkProcess-1 --- reloading something and waiting.
ForkProcess-4 --- reloading something and waiting.
ForkProcess-1 --- released.
ForkProcess-4 --- released.
ForkProcess-3 --- released.
ForkProcess-2 --- released.
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Process finished with exit code 0

如果你愿意将barrier设为全局变量,且multiprocessing.get_context()._name返回"fork",那么你不需要使用initializer,因为在派生子进程时可以通过继承全局变量来访问。


在您的示例中,您已经等待所有提交给 foo 的未完成任务完成,这是在提交给 reload 之前应该执行的操作,现在您正在等待所有提交给 reload 的任务完成,这也是在提交进一步任务(到 foo)之前应该执行的操作。因此,我不清楚这里屏障的目的是什么。 - Booboo
@Booboo 这个屏障确保所有进程都通过防止一个进程多次获取相同的任务来获得“重新加载”。如果没有这个屏障,您不知道有多少进程最终会执行该任务(取决于操作系统调度和任务所需时间)。 - Darkonaut
所以我认为我之前忽略的一点是,你的意思是说Barrier(MAX_WORKERS)需要MAX_WORKERS个独立的进程在调用wait之前进行调用,从而保证每个进程都重新加载。我以为wait会立即返回一个计数。真是太傻了。 - Booboo
1
@Booboo 是的,进程/工作线程总是独立的。本质上,OP正在询问如何在进程已经启动后多次运行“initializer”。将barrier.wait()注释掉并运行几次。最终您将看到一个输出,其中一个工作线程没有重新加载,因为另一个工作线程已经完成了两次任务。 - Darkonaut

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接