多进程.Pool.map不能并行工作

3

我希望实现的目标:

并行化一个函数,每次调用生成一定数量的线程,像这样:

 - PROCESS01 -> 16 Threads
 - PROCESS02 -> 16 Threads
 - ...
 - PROCESSn -> 16 Threads

代码:

with multiprocessing.Pool(4) as process_pool:
    results = process_pool.map(do_stuff, [drain_queue()])

drain_queue() 返回一个项目列表时,

do_stuff(item_list):
    print('> PID: ' + str(os.getpid()))
    with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
        result_dict = {executor.submit(thread_function, item): item for item in item_list}
        for future in concurrent.futures.as_completed(result_dict):
            pass

thread_function()会处理传递给它的每个项目。

然而,当执行代码时,输出如下:

> PID: 1000
(WAITS UNTIL THE PROCESS FINISHES, THEN START NEXT)
> PID: 2000
(WAITS UNTIL THE PROCESS FINISHES, THEN START NEXT)
> PID: 3000
(WAITS UNTIL THE PROCESS FINISHES, THEN START NEXT)
> PID: 3000
(WAITS UNTIL THE PROCESS FINISHES, THEN START NEXT)

这里是任务管理器的截图

我错过了什么吗?我无法弄清楚为什么它不能按预期工作。 谢谢!


线程和进程是完全不同的东西。我不确定为什么你要混淆它们。全局解释器锁(GIL)确保只有一个线程可以同时执行字节码。 - roganjosh
因此,串行执行是由GIL引起的,它强制每个进程等待线程执行吗? - Temperosa
其实,这是个好问题。一旦启动多进程,如果它们是分开的进程,则跨进程的线程操作是否会相互干扰我不确定。我不能明确地回答,所以我推迟回答这个问题。 - roganjosh
1个回答

6
我发现问题所在。map()的第二个参数应该是可迭代的,而在我的情况下是包含单个对象的列表。

错在哪里?就是这个:[drain_queue()] ,它生成一个只包含一个对象的列表。

在这种情况下,代码:

with multiprocessing.Pool(4) as process_pool:
    results = process_pool.map(do_stuff, [drain_queue()])

multiprocessing.Pool.map 会强制将一个单一的对象分配给一个进程,即使它创建了 n 个进程,工作仍然只会被一个进程完成。幸运的是,这与 GIL 的限制无关。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接