在ProcessPoolExecutor中使用ThreadPoolExecutor

18

我对futures模块感到陌生,但我的任务可以从并行化中受益;但我似乎无法弄清楚如何设置线程函数和进程函数。希望有人能提供任何帮助。

我正在运行粒子群优化(PSO)算法。不想过多介绍PSO本身,以下是代码的基本布局:

有一个Particle类,其中有一个getFitness(self)方法(计算某些指标并将其存储在self.fitness中)。PSO模拟有多个粒子实例(对于一些模拟来说很容易超过10个;对于一些模拟来说可能达到数百或数千个)。
每隔一段时间,我都需要计算粒子的适应度。目前,我通过循环来完成这个过程:

for p in listOfParticles:
  p.getFitness(args)

然而,我注意到每个粒子的适应度可以独立计算,这使得适应度计算成为并行化的首选。实际上,我可以使用 map(lambda p: p.getFitness(args), listOfParticles)

现在,我可以轻松地使用 futures.ProcessPoolExecutor 来完成这个任务:

with futures.ProcessPoolExecutor() as e:
  e.map(lambda p: p.getFitness(args), listOfParticles)

由于调用 p.getFitness 的副作用存储在每个粒子本身中,因此我不必担心从 futures.ProcessPoolExecutor() 获得返回值。

到目前为止,一切都好。但现在我注意到 ProcessPoolExecutor 创建新进程,这意味着它会复制内存,这很慢。我想共享内存,所以我应该使用线程。这很好,直到我意识到在每个进程中运行多个线程的几个进程可能会更快,因为多个线程仍然只在我的 8 核机器的一个处理器上运行。

这里是我的问题所在:
根据我看到的示例,ThreadPoolExecutorlist 上操作。同样地,ProcessPoolExecutor 也是如此。因此,在 ProcessPoolExecutor 中不能进行任何迭代操作以将其分配给 ThreadPoolExecutor,因为 ThreadPoolExecutor 然后只能处理单个对象(请参见我下面发布的尝试)。
另一方面,我不能自己对 listOfParticles 进行切片,因为我希望 ThreadPoolExecutor 做出自己的魔法来确定需要多少个线程。

那么,最重要的问题(终于来了):
我应该如何构建代码才能有效地使用进程和线程并行化以下内容:

for p in listOfParticles:
  p.getFitness()

这就是我一直在尝试的,但我不敢尝试运行它,因为我知道它不会起作用:

>>> def threadize(func, L, mw):
...     with futures.ThreadpoolExecutor(max_workers=mw) as executor:
...             for i in L:
...                     executor.submit(func, i)
... 

>>> def processize(func, L, mw):
...     with futures.ProcessPoolExecutor() as executor:
...             executor.map(lambda i: threadize(func, i, mw), L)
...

我希望能够得到您对如何解决这个问题的任何想法,甚至是关于如何改善我的方法的建议。

如果有影响的话,我正在使用python3.3.2版本。


1
getFitness() 运行什么类型的代码?在CPython中线程的问题在于它们只适合于I/O绑定任务,因为CPython有一个全局解释器锁(“GIL”),它只允许一个线程同时运行。例如,如果getFitness()运行CPU绑定的Python代码,则GIL将使线程比不使用线程更慢(线程只会增加上下文切换的额外开销)。但是,如果getFitness()运行释放了GIL的扩展模块函数,则线程可能会有所帮助(例如,许多numpy函数会释放GIL)。 - Tim Peters
getFitness 将粒子编码的信息解释为神经网络的起始语义,运行生成的神经网络,并计算输出误差(这个输出误差就是适应度 - 实际上是它的倒数)。因此,我认为这个函数更多地受到 CPU 的限制而不是 I/O 的限制(我从头开始做了所有的神经网络工作,它们都是类列表和它们的乘积)。因此,在这种情况下,线程可能并不会太有帮助,但我仍然希望能够在适用的问题中使用 ThreadPool 和 ProcessPool。 - inspectorG4dget
3个回答

20

我会给你一段混合进程和线程的工作代码来解决问题,但这并不是你所期望的;-) 首先要做的是创建一个不会危及你真实数据的模拟程序,尝试些无害的东西。下面是开始:

class Particle:
    def __init__(self, i):
        self.i = i
        self.fitness = None
    def getfitness(self):
        self.fitness = 2 * self.i

现在我们有一些东西可以玩耍了。接下来是一些常量:

MAX_PROCESSES = 3
MAX_THREADS = 2 # per process
CHUNKSIZE = 100

请根据你的具体情况,适当调整以下代码中的参数CHUNKSIZE。稍后将对其进行解释。

对于您来说,第一个惊喜是我最低级别的工作函数是做什么用的。这是因为您在这里过于乐观:

由于调用p.getFitness的副作用存储在每个粒子本身中,因此我不需要担心从futures.ProcessPoolExecutor()获取返回值。

然而,工作进程中执行的任何操作都不能对主程序中的Particle实例产生任何影响。无论是通过fork()的写时复制实现还是因为它正在处理从跨进程传递的Particle pickle中未拆包的副本。

因此,如果您希望主程序可以看到适应度结果,则需要安排将信息发送回主程序。因为我不了解您的实际程序情况,在这里我假设Particle().i是唯一的整数,并且主程序可以轻松地将整数映射回Particle实例。因此,在此处,最低级别的工作函数需要返回一对:唯一的整数和适应度结果:

def thread_worker(p):
    p.getfitness()
    return (p.i, p.fitness)

考虑到这一点,很容易将一个 粒子 列表分布在不同的线程中,并返回一个由(粒子编号, 适应度)结果组成的列表:

def proc_worker(ps):
    import concurrent.futures as cf
    with cf.ThreadPoolExecutor(max_workers=MAX_THREADS) as e:
        result = list(e.map(thread_worker, ps))
    return result

注意:

  1. 这是每个工作进程将运行的函数。
  2. 我正在使用Python 3,因此使用list()强制e.map()将所有结果材料化为一个列表。
  3. 如评论中所述,在CPython下,将CPU绑定任务分散到线程中比在单个线程中完成它们要

现在只需要编写代码来将Particle列表分布到进程中,并检索结果。 这很容易通过multiprocessing实现,因此我要使用它。 我不知道concurrent.futures是否可以做到这一点(考虑到我们还混合了线程),但我并不在乎。 但是,因为我会提供可工作的代码,您可以尝试并回报;-)

if __name__ == "__main__":
    import multiprocessing

    particles = [Particle(i) for i in range(100000)]
    # Note the code below relies on that particles[i].i == i
    assert all(particles[i].i == i for i in range(len(particles)))

    pool = multiprocessing.Pool(MAX_PROCESSES)
    for result_list in pool.imap_unordered(proc_worker,
                      (particles[i: i+CHUNKSIZE]
                       for i in range(0, len(particles), CHUNKSIZE))):
        for i, fitness in result_list:
            particles[i].fitness = fitness

    pool.close()
    pool.join()

    assert all(p.fitness == 2*p.i for p in particles)

注意:

  1. 我正在手动将Particle列表分成块。这就是CHUNKSIZE的作用。因为工作进程希望处理一个列表Particle,反过来,这是因为futures map()函数想要这样做。无论如何,分块工作都是个好主意,这样你就能得到一些真正的回报,以换取每次调用的进程间开销。
  2. imap_unordered()不能保证返回结果的顺序。这给了实现更多的自由,以尽可能高效地安排工作。我们在这里不关心顺序,所以没问题。
  3. 请注意,循环检索(particle_id, fitness)结果,并相应地修改Particle实例。也许你真正的.getfitness会对Particle实例进行其他变异——猜不出。无论如何,主程序永远不会看到工人“通过魔术”进行的任何变异——你必须明确安排。在极限情况下,你可以返回(particle_id, particle_instance)对,然后在主程序中替换Particle实例。然后它们将反映在工作进程中进行的所有变异。

玩得开心 :-)

未来无限

结果证明,很容易替换multiprocessing。以下是更改内容。这也(如早期提到的)替换了原始的Particle实例,以捕获所有变化。不过,在这里存在一个权衡:拣选实例需要比拣选单个“适应度”结果多“很多”字节。有更多的网络流量。选择你的毒药 ;-)

只需将thread_worker()的最后一行替换为返回变异实例,如下所示:

return (p.i, p)

然后用以下内容替换所有的主要(main)块:

def update_fitness():
    import concurrent.futures as cf
    with cf.ProcessPoolExecutor(max_workers=MAX_PROCESSES) as e:
        for result_list in e.map(proc_worker,
                      (particles[i: i+CHUNKSIZE]
                       for i in range(0, len(particles), CHUNKSIZE))):
            for i, p in result_list:
                particles[i] = p

if __name__ == "__main__":
    particles = [Particle(i) for i in range(500000)]
    assert all(particles[i].i == i for i in range(len(particles)))

    update_fitness()

    assert all(particles[i].i == i for i in range(len(particles)))
    assert all(p.fitness == 2*p.i for p in particles)

这段代码与multiprocessor的代码非常相似。就我个人而言,我会使用multiprocessing版本,因为imap_unordered非常有用。这是简化接口的问题:它们通常以隐藏有用的可能性为代价来获得简单性。


不客气 :-) 刚刚有个编辑:实际上 multiprocessing 并不是必需的。 - Tim Peters
在进行并行处理时,何时使用ProcessPoolExecutor()而不是ThreadPoolExecutor(),或者反之?正如这里所述,您甚至可以使用“Future”对象将它们结合起来:S? - Melroy van den Berg
@TimPeters 你在函数内部导入concurrent.futures的原因是什么? - Al Guy
@TimPeters 我可以请你看一下我的问题吗?https://stackoverflow.com/questions/63306875/combining-multithreading-and-multiprocessing-with-concurrent-futures - Al Guy

4

首先,您确定在所有内核中运行进程时利用多个线程是否有益?如果是CPU密集型任务,可能不会。至少需要进行一些测试。

如果添加线程可以提高性能,下一个问题是手动负载平衡或自动负载平衡是否可以实现更好的性能。手动负载平衡是指将工作负载仔细分成类似计算复杂度的块,并为每个块实例化一个新的任务处理器。这是您最初但存在疑问的解决方案。自动负载平衡是创建进程/线程池并在工作队列上通信以获取新任务,这是您努力追求的方案。在我看来,第一种方法是Apache Hadoop范例之一,第二种方法是由作业队列处理器(例如Celery)实现的。第一种方法可能会受到某些任务块较慢且正在运行而其他完成的影响,而第二种方法会增加通信和等待任务开销,这是要进行性能测试的第二个点。

最后,如果您希望在静态进程集合中具有多线程功能,则据我所知,您无法使用当前的concurrent.futures库,而必须稍作修改。我不知道,是否有现有的解决方案来执行此任务,但由于concurrent是一个纯python解决方案(没有C代码),因此可以很容易地完成。工作处理器在ProcessPoolExecutor类的_adjust_process_count 方法中定义,并且通过子类化和重写它以实现多线程方法相当简单,您只需提供基于concurrent.features.thread的自定义_process_worker即可。

下面是参考的原始ProcessPoolExecutor._adjust_process_count

def _adjust_process_count(self):
    for _ in range(len(self._processes), self._max_workers):
        p = multiprocessing.Process(
                target=_process_worker,
                args=(self._call_queue,
                      self._result_queue))
        p.start()
        self._processes[p.pid] = p

我更倾向于选择自动负载平衡。这是因为分布虽然对我的模拟有所帮助,但并不是最重要的。所以,我试图在最小的编程工作量下实现更好的效率。但是,针对你的第一个观点,为什么我很难通过多个进程和每个进程的多个线程来提高CPU绑定任务的性能呢? - inspectorG4dget
@inspectorG4dget,没有实际的getFitness代码检查、CPU架构和使用的命令,以及取决于许多因素,很难说,但主要原因可能是CPU上下文切换、CPU缓存未命中等。您是否已经编写了一个多进程/多线程执行器来覆盖调整函数,或者需要更多帮助? - alko
我还没来得及编写一个覆盖调整函数。恐怕这有点超出了我的能力范围。但更重要的是,我不是在寻找绝对最佳的解决方案。我想要通过最小的努力获得一些速度提升,所以我不介意一个次优的解决方案,只要它仍然比单个单线程进程好就行。 - inspectorG4dget

2

这是一个通用的答案,利用了 threadedprocess 包,该包实现了 ThreadedProcesPoolExecutor,允许在进程池内使用线程池。以下是一个相对通用的实用函数,它使用了这个包:

import concurrent.futures
import logging
from typing import Callable, Iterable, Optional

import threadedprocess

log = logging.getLogger(__name__)


def concurrently_execute(fn: Callable, fn_args: Iterable, max_processes: Optional[int] = None, max_threads_per_process: Optional[int] = None) -> None:
    """Execute the given callable concurrently using multiple threads and/or processes."""
    # Ref: https://dev59.com/7WIj5IYBdhLWcg3wsm-n#57999709/
    if max_processes == 1:
        executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_threads_per_process)
    elif max_threads_per_process == 1:
        executor = concurrent.futures.ProcessPoolExecutor(max_workers=max_processes)  # type: ignore
    else:
        executor = threadedprocess.ThreadedProcessPoolExecutor(max_processes=max_processes, max_threads=max_threads_per_process)

    if max_processes and max_threads_per_process:
        max_workers = max_processes * max_threads_per_process
        log.info("Using %s with %s processes and %s threads per process, i.e. with %s workers.", executor.__class__.__name__, max_processes, max_threads_per_process, max_workers)

    with executor:
        futures = [executor.submit(fn, *fn_args_cur) for fn_args_cur in fn_args]

    for future in concurrent.futures.as_completed(futures):
        future.result()  # Raises exception if it occurred in process worker.

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接