Python中的嵌套并行性

20

我正在尝试使用Python进行多处理器编程。以Fibonacci为例,采用分而治之的算法。程序执行流会像树一样分支并行执行。换句话说,我们有一个嵌套并行的例子。

从Java中,我使用了线程池模式来管理资源,因为程序可能会很快地分支并创建太多短生命周期的线程。可以通过ExecutorService实例化单个静态(共享)线程池。

我希望Pool也是如此,但似乎Pool对象不能全局共享。例如,使用multiprocessing.Manager.Namespace()共享Pool将导致错误。

无法在进程间传递或pickle池对象

我有一个2部分的问题:

  1. 我在这里缺少什么?为什么进程之间不应该共享池?
  2. 如何实现Python中的嵌套并行模式?如果可能的话,保持递归结构而不是迭代。

from concurrent.futures import ThreadPoolExecutor

def fibonacci(n):
    if n < 2:
        return n
    a = pool.submit(fibonacci, n - 1)
    b = pool.submit(fibonacci, n - 2)
    return a.result() + b.result()

def main():
    global pool

    N = int(10)
    with ThreadPoolExecutor(2**N) as pool:
        print(fibonacci(N))

main()

Java

public class FibTask implements Callable<Integer> {

    public static ExecutorService pool = Executors.newCachedThreadPool();
    int arg;

    public FibTask(int n) {
        this.arg= n;
    }

    @Override
    public Integer call() throws Exception {
        if (this.arg > 2) { 
            Future<Integer> left = pool.submit(new FibTask(arg - 1));
            Future<Integer> right = pool.submit(new FibTask(arg - 2));
            return left.get() + right.get();
        } else {
            return 1;
        }

    } 

  public static void main(String[] args) throws Exception {
      Integer n = 14;
      Callable<Integer> task = new FibTask(n);
      Future<Integer> result =FibTask.pool.submit(task); 
      System.out.println(Integer.toString(result.get()));
      FibTask.pool.shutdown();            
  }    

}

我不确定这里是否有影响,但我忽略了“进程”和“线程”的区别;对我来说,它们都意味着“虚拟处理器”。我的理解是,池的目的是为了共享“池”或资源。运行任务可以向池发出请求。随着其他线程上的并行任务完成,这些线程可以被回收并分配给新任务。不允许共享池似乎对我没有意义,因为每个线程都必须实例化自己的新池,这似乎会破坏线程池的目的。

2
@InbarRose 问题在于,在执行递归调用的递归函数中,进程池被分叉并且也被子进程调用。这会导致队列出现问题,因此无法正常工作。不管怎样,我想强调的是,在Java中你正在使用线程。使用线程没有任何问题,因为没有池对象的分叉。我相信在Java中使用进程池会导致更或多或少相同的行为。 - Bakuriu
@InbarRose 我也尝试将Pool作为类实例和静态变量进行包含,但仍然遇到相同的问题。例如,使用Pool并将递归调用包含在单个类中,但这样做仍然会导致相同的问题:>池对象不能在进程之间传递... - T. Webster
1
@T.Webster:“代码为什么不能工作?”:1.思考一下,为什么会同时存在apply()apply_async()方法?2.Python池更像是FixedThreadPool(这意味着如果线程不够就会出现死锁),而不是CachedThreadPool(在这种情况下,斐波那契数列会创建一个神话般的分叉炸弹)。以下是Python中的代码示例(虽然没有什么意义) concurrent.futuresmp.dummy - jfs
1
尝试在您自己的计算机上运行代码。如果您的环境允许创建足够的线程,则应该可以正常工作。在Python 2上,其中一个脚本需要pip install futures - jfs
1
但是我忽略了“进程”和“线程”的区别,这是不可忽视的。虽然混合使用线程和多进程是可能的,但您必须了解Python实现的常见陷阱(假定为CPython)-如果线程正在执行连续工作,则不会从使用线程中获得大的性能提升-请阅读有关全局解释器锁的信息。由于Python解释器一次只允许执行一个线程,在每个X操作后切换线程,因此可以清楚地看出,在Python中进行线程处理对您的任务没有用处。请考虑仅使用多进程。 - ElmoVanKielmo
显示剩余6条评论
2个回答

5

1) 我错过了什么; 为什么进程之间不应该共享一个池(Pool)?

并不是所有的对象/实例都可以被 pickle/序列化,而在这种情况下,池(Pool)使用的线程锁(threading.lock)是不能被 pickle 的:

>>> import threading, pickle
>>> pickle.dumps(threading.Lock())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
[...]
  File "/Users/rafael/dev/venvs/general/bin/../lib/python2.7/copy_reg.py", line 70, in _reduce_ex
    raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle lock objects

更好的选择:
>>> import threading, pickle
>>> from concurrent.futures import ThreadPoolExecutor
>>> pickle.dumps(ThreadPoolExecutor(1))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 1374, in dumps
    Pickler(file, protocol).dump(obj)
  File 
[...]
"/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 306, in save
        rv = reduce(self.proto)
      File "/Users/rafael/dev/venvs/general/bin/../lib/python2.7/copy_reg.py", line 70, in _reduce_ex
        raise TypeError, "can't pickle %s objects" % base.__name__
    TypeError: can't pickle lock objects

如果你仔细想一下,就会发现有道理,锁是由操作系统管理的信号量原语(因为Python使用本机线程)。能够在Python运行时内部对该对象状态进行pickle和保存并没有实现任何有意义的事情,因为其真正的状态是由操作系统保持的。
2)Python中实现嵌套并行的模式是什么?如果可能的话,保持递归结构,而不是换成迭代。
现在,对于这个例子,我上面提到的所有内容都不适用,因为你正在使用线程(ThreadPoolExecutor),而不是进程(ProcessPoolExecutor),所以不需要跨进程共享数据。
你的Java示例似乎更有效,因为你使用的线程池(CachedThreadPool)会根据需要创建新线程,而Python执行器实现是有限制的,并且需要显式的最大线程数(max_workers)。两种语言之间还有一些语法差异,这似乎也让你感到困惑(Python中的静态实例基本上是未明确作用域的任何内容),但实质上,两个示例将创建完全相同数量的线程才能执行。例如,以下是一个相当天真的CachedThreadPoolExecutor实现的Python示例:
from concurrent.futures import ThreadPoolExecutor

class CachedThreadPoolExecutor(ThreadPoolExecutor):
    def __init__(self):
        super(CachedThreadPoolExecutor, self).__init__(max_workers=1)

    def submit(self, fn, *args, **extra):
        if self._work_queue.qsize() > 0:
            print('increasing pool size from %d to %d' % (self._max_workers, self._max_workers+1))
            self._max_workers +=1

        return super(CachedThreadPoolExecutor, self).submit(fn, *args, **extra)

pool = CachedThreadPoolExecutor()

def fibonacci(n):
    print n
    if n < 2:
        return n
    a = pool.submit(fibonacci, n - 1)
    b = pool.submit(fibonacci, n - 2)
    return a.result() + b.result()

print(fibonacci(10))

性能调优:

我强烈建议使用gevent,因为它可以在没有线程开销的情况下提供高并发。虽然并非总是如此,但您的代码实际上是使用gevent的代表案例。这里有一个示例:

import gevent

def fibonacci(n):
    print n
    if n < 2:
        return n
    a = gevent.spawn(fibonacci, n - 1)
    b = gevent.spawn(fibonacci, n - 2)
    return a.get()  + b.get()

print(fibonacci(10))

虽然完全不科学,但在我的电脑上,以上代码的运行速度比其多线程版本快9倍

希望这有所帮助。


1
gevent 不提供任何并行性。 - Pi Delport
没错,没有计算并行性,但原始问题并不是要改变所选择的斐波那契算法,而是提出一种常见模式来改进它。 - Rafael Ferreira
不需要进行算法更改:示例已经将工作分成了独立的子任务。所需的只是一个实际可以并行执行任务的基础设施(即不是像gevent这样的并发解决方案)。 - Pi Delport
在Java 7+中,现在可以使用ForkJoin任务API在线程池内部实现有限数量线程的嵌套并行处理。据我所知,在Python中没有相应的API。 - ogrisel

1

1. 我在这里漏掉了什么; 为什么进程之间不应该共享一个池?

通常情况下,无论使用哪种语言,都不能在进程之间共享操作系统线程。

您可以安排将池管理器与工作进程共享访问,但这可能不是解决任何问题的好方法;请参见下面的内容。

2. Python中实现嵌套并行的模式是什么?如果可能,保持递归结构,而不是迭代。

这很大程度上取决于您的数据。

在CPython上,一般的答案是使用实现高效并行操作的数据结构。NumPy的优化数组类型就是一个很好的例子:这里是使用它们将大型数组操作分割成多个处理器核心的示例。

使用阻塞递归实现的斐波那契函数对于任何基于工作池的方法来说都是一个特别糟糕的选择,因为fib(N)将花费大量时间来占用N个工作者什么也不做,只是等待其他工作者。有许多其他方法可以特别处理斐波那契函数(例如使用CPS来消除阻塞并填充恒定数量的工作者),但最好根据你将要解决的实际问题来决定你的策略,而不是像这样的例子。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接