Python多进程池惰性迭代

67

我想了解Python的Multiprocessing.Pool类如何使用map、imap和map_async方法。我的问题是,我想对创建内存密集型对象的迭代器进行映射,而且不希望所有这些对象同时生成到内存中。我想知道各种map()函数是否会使我的迭代器耗尽,或者只有子进程缓慢前进时才智能地调用next()函数,因此我进行了一些测试:

def g():
  for el in xrange(100):
    print el
    yield el

def f(x):
  time.sleep(1)
  return x*x

if __name__ == '__main__':
  pool = Pool(processes=4)              # start 4 worker processes
  go = g()
  g2 = pool.imap(f, go)
  g2.next()

继续处理map、imap和map_async等操作。然而,这是最明显的例子,因为仅调用g2上的next()一次就会打印出来自生成器g()的所有元素,而如果imap在“惰性”执行此操作,则我期望它仅调用go.next()一次,因此仅打印出“1”。

有人能否澄清正在发生的事情,并且是否有某种方法使进程池“惰性”地根据需要评估迭代器?

谢谢,

Gabe


在删除time.sleep调用并在f中添加print os.getpid(), x后,行为看起来更加奇怪,有时只打印2或3个不同的PID,并且总是执行不同数量的迭代... 顺便问一下,您使用的是哪个Python版本? - Terseus
Python 2.6.6(r266:84292,2010年12月26日,22:31:48)标准 Debian 安装。 - Gabe
5个回答

39

让我们首先看一下程序的结尾。

multiprocessing模块使用atexit来在程序结束时调用multiprocessing.util._exit_function

如果你移除g2.next(),你的程序会很快结束。

_exit_function最终会调用Pool._terminate_pool。主线程会将pool._task_handler._state的状态从RUN改变为TERMINATE。同时,pool._task_handler线程正在Pool._handle_tasks中循环处理任务,并在达到条件时退出。

            if thread._state:
                debug('task handler found thread._state != RUN')
                break

(请查看/usr/lib/python2.6/multiprocessing/pool.py)

以下代码可以防止任务处理程序完全消耗您的生成器g()。如果您查看Pool._handle_tasks,您将看到:

        for i, task in enumerate(taskseq):
            ...
            try:
                put(task)
            except IOError:
                debug('could not put task on queue')
                break

以下是消耗您生成器的代码。(taskseq不完全是您的生成器,但是由于taskseq被消耗,您的生成器也被消耗了。)

相比之下,当您调用g2.next()时,主线程会调用IMapIterator.next并在到达self._cond.wait(timeout)时等待。

主线程等待而不是调用_exit_function让任务处理线程正常运行,这意味着它会在Pool._handle_tasks函数中将任务放入workerinqueue中并完全消耗生成器。

总之,所有Pool映射函数都会消耗给定的整个可迭代对象。如果您想分批次使用生成器,则可以执行以下操作:

import multiprocessing as mp
import itertools
import time


def g():
    for el in xrange(50):
        print el
        yield el


def f(x):
    time.sleep(1)
    return x * x

if __name__ == '__main__':
    pool = mp.Pool(processes=4)              # start 4 worker processes
    go = g()
    result = []
    N = 11
    while True:
        g2 = pool.map(f, itertools.islice(go, N))
        if g2:
            result.extend(g2)
            time.sleep(1)
        else:
            break
    print(result)

3
很棒的答案,最终我重新实现了一个线程池,同时按元素进行消耗,但是你的islice解决方案对我来说会少很多工作,唉 :-)。我尝试在pool.py中找到一些信息,并注意到map / imap / map_async函数似乎立即使用了迭代器。但我不确定这是否在标准Pool.map()的情况下真的是必要的。 - Gabe
2
@Gabe:为了即时使用迭代器,我认为必须编写一些额外的信号机制来告诉任务处理程序何时向inqueue中再次“放置”更多任务。也许这是可能的,但目前在Pool中不存在,并且可能会稍微减慢过程的速度。 - unutbu
我的解决方案是创建一个大小为N*size_of_pool的任务队列,并调整N的值,直到队列看起来保持了良好的缓冲区。当然,这取决于任务,所以我可以理解池代码的作者不想处理这个问题。感谢您的回复! - Gabe
如果生成器是这样的,你不知道元素的数量(在这种情况下为100),该怎么办? - Vince
1
@Vince:你可以将for-loop改为while-loop,并在pool.map的结果为空时跳出循环。我已经编辑了帖子,以展示我的意思。 - unutbu

5
您需要的功能已在NuMap包中实现,该包来自以下网站:

NuMap是一种并行(基于线程或进程、本地或远程)、缓冲、多任务、itertools.imap或multiprocessing.Pool.imap函数替代品。与imap类似,它对序列或可迭代对象的元素评估函数,并以惰性方式执行。 惰性可以通过“stride”和“buffer”参数进行调整。


4

我也遇到了这个问题,并且很失望地发现 map 函数会消耗它的所有元素。我编写了一个函数,使用 multiprocessing 中的队列数据类型来延迟地消耗迭代器。这类似于 @unutbu 在评论中描述的内容,但正如他所指出的那样,它没有重新加载队列的回调机制。相反,队列数据类型公开了一个超时参数,我已经成功地使用了 100 毫秒。

from multiprocessing import Process, Queue, cpu_count
from Queue import Full as QueueFull
from Queue import Empty as QueueEmpty

def worker(recvq, sendq):
    for func, args in iter(recvq.get, None):
        result = func(*args)
        sendq.put(result)

def pool_imap_unordered(function, iterable, procs=cpu_count()):
    # Create queues for sending/receiving items from iterable.

    sendq = Queue(procs)
    recvq = Queue()

    # Start worker processes.

    for rpt in xrange(procs):
        Process(target=worker, args=(sendq, recvq)).start()

    # Iterate iterable and communicate with worker processes.

    send_len = 0
    recv_len = 0
    itr = iter(iterable)

    try:
        value = itr.next()
        while True:
            try:
                sendq.put((function, value), True, 0.1)
                send_len += 1
                value = itr.next()
            except QueueFull:
                while True:
                    try:
                        result = recvq.get(False)
                        recv_len += 1
                        yield result
                    except QueueEmpty:
                        break
    except StopIteration:
        pass

    # Collect all remaining results.

    while recv_len < send_len:
        result = recvq.get()
        recv_len += 1
        yield result

    # Terminate worker processes.

    for rpt in xrange(procs):
        sendq.put(None)

这种解决方案的优点是不需要批处理请求到Pool.map中。一个独立的工作者不会阻塞其他人的进展。但是你的情况可能会有所不同。请注意,您可能需要使用不同的对象来向工作人员发送终止信号。在示例中,我使用了None。

在“Python 2.7(r27:82525,2010年7月4日,09:01:59)[MSC v.1500 32位(Intel)] on win32”上测试通过。


我已经在Python 3.3上进行了检查,发现imapimap_unordered在启动映射函数之前并不会消耗所有参数,而map则会。 - vy32
+1 这几乎是我需要的,但不幸的是我需要有序的结果。 - letmaik
与其调整输入/输出队列的获取/放置超时时间,我通常会1)为两个队列设置固定大小,2)如果队列为空/满,则让获取/放置阻塞。这样就不需要调整超时时间。只需要检查进入输入队列和离开输出队列的项目数量即可。正确的顺序是:1)启动工作线程;2)启动输出队列收集器;3)迭代输入并填充输入队列。 - chronos
@neo,有可能获得有序的结果。实现这一点的方法是拥有 4 个限制大小的队列 [In(工人数据),Out(最终,正确排序的结果),Serial(跟踪处理中的项目),Sort(中间队列)]和两种类型的工人 - 单个 'sorter()' 和 N 个实际工人。思路是:1)有一个 'serial' 号生成器;2)将每个数据集作为 'In.put((serial, data))' 提交,连同 'Serial.put(serial)';3)工人执行 'Sort.put((serial, result))';4)'sorter()' 从 Sort 获取项目,按 'serial' 排序,并放入 Out。 - chronos
@neo,这是一个未经测试的sorter()示例:https://bitbucket.org/qmentis/bioinformatics-scripts/src/333a2d67f7931e2e44142298f64a9d1bc7ce7b59/sliding-window-polyA-trimmer.py?at=master#cl-154 忘记提到所有队列必须具有相同的大小限制,并且假定此方案中所有队列项需要大约相同的处理时间(否则sorter()的内部缓冲区将开始累积太多结果)。 - chronos

1
在这个例子中(请查看代码),有2个工人。
池按预期工作:当工人空闲时,进行下一次迭代。
这段代码与主题中的代码相同,除了一个问题:参数大小为64k。
64k是默认套接字缓冲区大小。
import itertools
from multiprocessing import Pool
from time import sleep


def f( x ):
    print( "f()" )
    sleep( 3 )
    return x


def get_reader():
    for x in range( 10 ):
        print( "readed: ", x )
        value = " " * 1024 * 64 # 64k
        yield value


if __name__ == '__main__':

    p = Pool( processes=2 )

    data = p.imap( f, get_reader() )

    p.close()
    p.join()

0
我也遇到了这个问题,但我找到了与其他答案不同的解决方法,所以我想分享一下。
import collections, multiprocessing

def map_prefetch(func, data, lookahead=128, workers=16, timeout=10):
    with multiprocessing.Pool(workers) as pool:
        q = collections.deque()
        for x in data:
            q.append(pool.apply_async(func, (x,)))
            if len(q) >= lookahead:
                yield q.popleft().get(timeout=timeout)
        while len(q):
            yield q.popleft().get(timeout=timeout)

for x in map_prefetch(myfunction, huge_data_iterator):
    # do stuff with x

基本上,它使用队列将最多lookahead个挂起的请求发送到工作池,强制限制缓冲结果的数量。工作尽快开始在该限制内运行,以便可以并行运行。此外,结果保持有序。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接