多进程在Python中不能平均分配任务

4
我正在尝试使用Pool将所有作业均匀地分配给16个处理器。我注意到最初会产生16个进程。几秒钟后,只有2个进程执行其余一小部分工作的所有工作。无论我增加多少负载,似乎都会稳定减少在此工作上工作的进程数。最终,只有1或2个进程通过其余作业。
以下是我的代码中的多进程片段。
c_size = len(sampled_patterns) / (cpu_count() -1)

pool = Pool(processes=cpu_count() -1)
works = [(pattern, support_set, hit_rates) for pattern,support_set in sampled_patterns.items()]
pool.starmap(get_hit_rules,works, chunksize=int(c_size))

有没有办法利用所有的16个处理器来最大化并行化?谢谢!

编辑!以下是任务分配方式。计数器使用pid作为键,任务数作为值。

Counter({30179: 14130, 30167: 13530, 30169: 12900, 30173: 12630, 30165: 12465, 30177: 12105, 30163: 11820, 30175: 11460, 30161: 10860, 30181: 10725, 30183: 9855, 30157: 8695, 30159: 6765, 30171: 4860, 30155: 1770})

chunksize 并没有做你想要的事情 - 如果您想将 works 可迭代对象分割成相同大小的块并在池中的所有进程之间进行分配,那么请将其设置为池中存在的进程数(即 pool._processes)。不过,如果您确实需要这样做,那么真正的问题是为什么您需要一个 Pool - zwer
谢谢。这是我第一次使用多进程编写代码。我使用了Pool,因为相比于生成许多Process,它看起来不那么可怕。我认为Pool会为我处理好这些事情。现在我正在阅读关于Pool和Process的比较,是否有更好的方法? - Raja
如果works中的项目数量达到了百万甚至十亿级别,那么我认为使用Pool比创建大量的Process更加适合。 - Raja
更改钻头尺寸并没有帮助!! - Raja
Python多进程:理解chunksize背后的逻辑 - Darkonaut
1个回答

3

好的,我将作为答案对此进行扩展。

multiprocessing.Pool 的整个目的是生成一定数量的进程,并以“先到先得”的方式在它们之间分配工作。这意味着如果您有 n 个要处理的项目和池中的 p 个进程,则会选择 p(或者如果定义了 chunksize,则选择 p * chunksize)个项目,并将每个项目发送到一个单独的进程进行处理。一旦进程完成处理一个项目并有效地释放,如果仍有未处理的项目,池将选择下一个项目,将其发送到已释放的进程中,依此类推,直到没有更多项目剩余。这确保了您生成的进程的最佳利用,而无需自己管理分布。

这也意味着 multiprocessing.Pool 并不适用于每种情况。根据所提供的代码,在您的情况下,您希望均匀分配可迭代项到固定数量的进程中,因此池只是额外负担 - 一旦进程完成,就不会再有更多数据需要分配。如果您只想拆分数据并将每个块发送到不同的进程,那么只需简单地执行以下操作:

import multiprocessing

if __name__ == "__main__":  # always guard your multiprocessing code
    cores = max(multiprocessing.cpu_count() - 1, 1)  # ensure at least one process

    works = [(p, s, hit_rates) for p, s in sampled_patterns.items()]
    chunk_size = (len(works) + cores - 1) // cores  # rough chunk size estimate

    processes = []  # a simple list to hold our process references
    for i in range(cores):
        work_set = works[i*chunk_size:(i+1)*chunk_size]
        process = multiprocessing.Process(target=get_hit_rules, args=(work_set,))
        process.start()
        processes.append(process)

    results = [process.join() for process in processes]  # get the data back

这将完全实现您尝试完成的操作 - 启动 cpu_count() 个进程,并将数据均匀地分成一个个大小相等的块,以便所有数据可以同时进行并行处理(最后一个进程获取的数据会略少一些)。
当然,如果您的数据太大,正如您在评论中补充的那样,这将变得难以管理,此时可以回退到 multiprocessing.Pool,将可管理的数据块发送到生成的进程以便逐个处理。此外,构建 works 列表也是无意义的 - 您为什么要构建一个包含数十亿项的列表,而您已经在 sampled_patterns 字典中拥有了这些数据呢?
为什么不直接从 sampled_patterns 字典中发送单个项,而不需要构建一个中间列表,以便将其映射到 multiprocessing.Pool 呢?为此,您只需要创建某种迭代器切片,并将其提供给 multiprocessing.Pool.imap,然后让池在内部管理剩余部分,所以:
import multiprocessing

def patterns_slicer(patterns, size, hit_rates):
    pos = 0  # store our current position
    patterns = patterns.items()  # use the items iterator
    while pos < len(patterns):
        yield [(p, s, hit_rates) for p, s in patterns[pos:pos+size]]
        pos += size

if __name__ == "__main__":  # always guard your multiprocessing code
    cores = max(multiprocessing.cpu_count() - 1, 1)  # ensure at least one process
    pool = multiprocessing.Pool(processes=cores)
    # lets use chunks of 100 patterns each
    results = pool.imap(get_hit_rules, patterns_slicer(sampled_patterns, 100, hit_rates))

当然,multiprocessing.Pool.imap 做了很多预读取操作,因此如果你的原始数据过大或者你想使用巨大的块,你可能需要考虑实现自己的 imap 并进行即时数据检索。请参考这个答案中的示例。

感谢您的努力和时间!非常感激。我还没有能够让我的代码与imap一起工作。可能我应该提到的一件事是,我正在使用Manager().dict()来更新每个进程的结果。如果我将我的作业作为生成器传递给imap,它可以正常工作。但是当我传递iterator_slicer时却什么也没发生。 - Raja

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接