如何同时加入一个 multiprocessing.Process() 的列表?

8

给定一个正在运行的multiprocessing.Process实例的list(),如何在没有Process.join超时和循环的情况下加入所有实例并返回一次退出的结果?

示例

from multiprocessing import Process
from random import randint
from time import sleep
def run():
    sleep(randint(0,5))
running = [ Process(target=run) for i in range(10) ]

for p in running:
    p.start()

如何在至少有一个p中的Process退出前阻塞?

我不想做的事情是:

exit = False
while not exit:
    for p in running:
        p.join(0)
        if p.exitcode is not None:
            exit = True
            break

2
你也许不应该使用multiprocessing.Process。如果您使用concurrent.futures.ProcessPoolExecutor,那么您所要求的事情就很简单了:只需使用map并在迭代器上调用next即可。 - Bakuriu
map(Process.join, running) 可以吗? - Green Cloak Guy
据我所见,这只会在所有进程退出后才返回。这不是我想要的。将此作为问题澄清添加进去。谢谢。 - Ente
@Bakuriu 我还不明白这如何解决我的问题。我能看到的是,每次调用 next 都会运行一个进程。我希望所有进程都可以并行启动,并且在至少有一个进程存在时检测到它。如果很简单,您可以贴一小段示例代码来演示您的方法吗? - Ente
1
只是为了明确- 当任何一个子进程完成并放弃其余的进程时,您真的想退出应用程序吗? - Paul Becotte
1
@Paul 不是的。我只是想停止等待。等待后触发的逻辑对于问题并不重要,因此我将其留空了。 - Ente
2个回答

12
您可以使用 multiprocessing.connection.wait()(Python 3.3+)一次等待多个Process.sentinel。只要进程退出,sentinel 就会变为可用状态,并解除 connection.wait() 的阻塞。

multiprocessing.connection.wait(object_list, timeout=None)

等待 object_list 中的某个对象准备就绪,并返回准备就绪的对象列表。如果设置了 float 类型的 timeout 参数,则调用将最多阻塞该数量的秒。如果设置了 None,则它将无限期地阻塞。负 timeout 值相当于零超时。

对于 Unix 和 Windows,如果对象满足以下条件,则可以出现在 object_list 中:

  • 可读的 Connection 对象;

  • 已连接并且可读的 socket.socket 对象;或

  • 某个 Process 对象的 sentinel 属性。

当有数据可以从连接或套接字对象中读取,或另一端已关闭时,则连接或套接字对象已准备就绪。...

from multiprocessing import Process, connection, current_process
from random import randint
from time import sleep
from datetime import datetime


def run():
    sleep(randint(2,10))
    print(f"{datetime.now()} {current_process().name} exiting")


if __name__ == '__main__':

    pool = [Process(target=run) for _ in range(4)]

    for p in pool:
        p.start()

    print(f"{datetime.now()} {current_process().name} waiting")
    connection.wait(p.sentinel for p in pool)
    print(f"{datetime.now()} {current_process().name} unblocked")

示例输出:

2019-07-22 21:54:07.061989 MainProcess waiting
2019-07-22 21:54:09.062498 Process-3 exiting
2019-07-22 21:54:09.063565 MainProcess unblocked
2019-07-22 21:54:09.064391 Process-4 exiting
2019-07-22 21:54:14.068392 Process-2 exiting
2019-07-22 21:54:17.062045 Process-1 exiting

Process finished with exit code 0

这正是我一直在寻找的。我知道只需使用多进程就可以实现。谢谢,非常好的答案! - Ente
1
我该如何确定哪个进程解除了主进程的阻塞? - Mr. Developerdude
2
@LennartRolland 在启动进程之后,您将 pool 列表转换为字典,其中将进程的 sentinel 值映射到相应的 Process 实例。然后,当 connection.wait() 返回带有 sentinel 的时候,您在 pool 字典中查找。 - Darkonaut

0

按照您的要求,确切地说,没有办法做到这一点-这就是API设置的方式。但如果您可以将其提升到创建进程列表的级别,那么有许多优秀的解决方案。

最好的方法可能是使用multiprocessing.Pool.imap_unordered()。它将接受一个函数和一个可迭代的输入,创建一堆进程,并将输入提供给进程。它返回一个可迭代对象,next方法将等待值准备就绪,然后在每个值变为可用时返回。

如果您无法将问题转化为函数+输入,则下一个解决方案是使用某些同步原语。对于我猜测您想要完成的任务,我会使用信号量-

sem = Semaphore(0)

def build_proc(the_sem):
    do_some_work
    the_sem.release()

myprocs = [buld_proc(sem) for _ in range(10)]

# in your code-
start_procs(myprocs)
done = 0
while done < len(myprocs):
    sem.acquire()
    do_post_processing()

如果你真的不需要循环,事件也可以起作用,只需等待第一个进程设置它。如果你真的无法以任何方式修改创建进程的函数,我能想象到的最终解决方案是(相当糟糕哈哈)- 使用线程池为每个进程设置一组等待者。

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED

def waiter(proc):
    proc.join()

with ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(waiter, p) for p in processes]
    # this will return as soon as one completes
    results = wait(futures, return_when=FIRST_COMPLETED)

抱歉,看了@Darkonaut的答案后,我认为你的答案是不正确的。另外,使用信号量并不能达到我想要的效果,因为它再次按顺序检查进程,并且如果至少有一个进程存在,则不会继续执行。除非我们回退到轮询和循环。 - Ente
不错,另一个答案肯定更好。我花了一段时间仔细检查文档,也在找那个哈哈!然而,Semaphore进程并没有执行“检查循环”...它将相同的信号量传递给所有子进程。每个子进程都会向信号量添加一个,只有当它达到1或更多时,父进程才会继续。尽管如此,上面的答案肯定更好。 - Paul Becotte

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接