在Python中实现流水线生成器

3

我有一个Python数据生成器和一组昂贵的操作,我想对这些数据执行。简单来说,对于每个数据,我想执行d(g(h(q(x)))),其中x是数据。我想通过使用处理管道部分隐藏执行这些操作的代码。

另一种思考问题的方式是,在每个阶段,我希望有一组工人通过队列读取前一个阶段的结果,进行处理,然后将结果放置在另一个队列上。

我的当前解决方案(有效)是:

from multiprocessing.pool import ThreadPool 

class FuncIterator(object):
    def __init__(self, func, base_iterator, pool_size=10):

        self.func = func
        self.base_iterator = base_iterator

        self.pool = ThreadPool(pool_size)

    def __iter__(self):
        aa = self.pool.imap(self.func, self.base_iterator, chunksize=1)

        for item in aa:
            yield item

这种解决方案的问题在于队列是无界的;也就是说,生产者可能会领先于消费者,从而导致内存使用无限增长。我希望通过限制中间队列的大小来防止这种情况发生。

我的第一个想法是使用显式的 Queue 实现以下内容:

from multiprocessing.pool import Queue

def get_queue(func, f_iter, maxsize=5):
    queue = Queue.Queue(maxsize=maxsize)

    def runner(source):
        for entry in source:
            queue.put(func(entry), True)
        queue.put(StopIteration)

    process = ThreadPool.Process(target=runner, args=(f_iter,))
    process.start()
    return queue

那么我如何控制使用多少个工作者?


不确定你的问题。你有一组操作,打算以某种管道方式链接起来以提高可读性,还是我漏掉了什么?如果下一个操作依赖于前一个操作的结果,那么使用线程只会引入额外的开销而没有任何好处。 - noxdafox
1个回答

0
这是我想出的解决方案。它采用了我的原始解决方案,解决了无界队列问题,并使用了一个Semaphore
def _do_sem2(sem, x):
    sem.acquire()
    return x

class FuncIterator(object):
    def __init__(self, func, base_iterator, pool_size=10, queue_size=10):

        self.func = func
        self.base_iterator = base_iterator

        self.pool = ThreadPool(pool_size)
        self.sem = BoundedSemaphore(queue_size)

    def __iter__(self):
        aa = self.pool.imap(self.func, (_do_sem2(self.sem, x) for x in self.base_iterator), chunksize=1)

        for item in aa:
            self.sem.release()
            yield item

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接