Python的多进程池作为装饰器

6

我正在处理一些与python的multiprocessing Pool类有关的代码。这导致了大量的代码像这样:

import time
from multiprocessing import Pool
from functools import partial

def test_func(x):
    time.sleep(1)
    return x

def test_func_parallel(iterable, processes):
    p = Pool(processes=processes)
    output = p.map(test_func, iterable)
    p.close()
    return output

这个可以更加通用:

def parallel(func, iterable, **kwargs):
    func = partial(func, **kwargs)
    p = Pool(processes=6)
    out = p.map(func, iterable)
    p.close()
    return out

这样做虽然有效,但为每个其他函数添加一个并行包装器会使代码变得复杂。我真正想要的是将其作为修饰器运行。类似于这样:

def parallel(num_processes):
    def parallel_decorator(func, num_processes=num_processes):
        def parallel_wrapper(iterable, **kwargs):
            func = partial(func, **kwargs)
            p = Pool(processes=num_processes)
            output = p.map(func, iterable)
            p.close()
            return output

        return parallel_wrapper
    return parallel_decorator

以下是该技术的使用方法:

@parallel(6)
def test_func(x):
    time.sleep(1)
    return x

由于pickle的原因,这种方法会失败。

无法将<function test1 at 0x117473268>加入到pickle中:它与__main__.test1不是同一个对象

我已经看过几篇相关问题的帖子,但它们都实现了在装饰器外执行多进程的解决方案。有人知道如何使这个方法能够工作吗?


1
顺便说一句,这个问题比我看到的大多数有关多进程的问题要好得多 - 经过深思熟虑,有一个可靠的复现者等等。 - Charles Duffy
1
查看多处理和dill可以一起做什么? - 切换到第三方的pathos.multiprocessing,你就可以实现了。 - Charles Duffy
你能尝试使用“fork”吗?还是你在使用Windows系统? - Aaron
你尝试过利用copyreg吗?https://docs.python.org/3.7/library/copyreg.html 此外,functools中还有`partialmethod',但我不知道它是否是一个解决方案。https://docs.python.org/3.7/library/functools.html - Mark Moretto
他们都实现了一种解决方案,其中多进程在装饰器之外执行。 >> 你介意分享这些的链接吗?(供我/公众学习使用)谢谢。 - p._phidot_
2个回答

3

如果您不介意不使用装饰器的语法糖(@符号),这样应该可以实现:

import functools
import time

from multiprocessing import Pool


def parallel(func=None, **options):
    if func is None:
        return functools.partial(parallel, **options)

    def wrapper(iterable, **kwargs):
        processes = options["processes"]

        with Pool(processes) as pool:
            result = pool.map(func, iterable)

        return result

    return wrapper


def test(i):
    time.sleep(1)
    print(f"{i}: {i * i}")

test_parallel = parallel(test, processes=6)


def main():
    test_parallel(range(10))


if __name__ == "__main__":
    main()

为什么不直接使用parallel(test, processes=6)(range(10))呢?我也认为@functools.wraps(func)没有实现任何功能,因为你没有包装func - Booboo

0

我有同样的问题。它围绕着如何实现Pool()对象展开。因此,使用普通包装器可以正常工作,但使用装饰器则不行。解决方法是使用Process()定义自己的类似Pool()的实现。

这可能非常棘手,但如果你是一个装饰器爱好者,这里有一个(肮脏的)例子:

# something to do
args = range(10)


def parallel(function):
    """ An alternative implementation to
    multiprocessing.Pool().map() using
    multiprocessing.Process(). """

    def interfacer(args):
        """ The wrapper function. """
        
        # required libraries
        from multiprocessing import (Queue, Process)
        from os import cpu_count
        
        # process control
        ## maximum number of processes required
        max_processes = len(args)
        
        ## maximum numer of processes running
        max_threads = cpu_count() - 1
        
        """ Since there is no Pool() around
        we need to take care of the processes
        ourselves. If there is nothing for a
        processes to do, it is going to await
        for an input, if there are too many of
        them, the processor shall suffer. """
        
        # communications
        ## things to do
        inbasket = Queue()
        
        ## things done
        outbasket = Queue()
        
        """ I am thinking asynchronouly,
        there is probably a better way of
        doing this. """
        
        # populate inputs
        for each in args:
            
            ## put arguments into the basket
            inbasket.put(each)
        
        def doer():
            """ Feeds the targeted/decorated
            'function' with data from the baskets and
            collets the results.
            
            This blind function helps the
            implementation to generalize over any
            iterable. """
            
            outbasket.put(function(inbasket.get()))
            return(True)
        
        def run(processes = max_threads):
            """ Create a certain number of
            Process()s and runs each one. 
            There is room for improvements here. """
            
            # the process pool
            factory = list()
            
            # populate the process pool
            for each in range(processes):
                factory.append(Process(target = doer))
            
            # execute in process pool
            for each in factory:
                each.start()
                each.join()
                each.close()
            
            return(True)
        
        """ Now we need to manage the processes,
        and prevent them for overwhelm the CPU.
        That is the tricky part that Pool() does
        so well. """
        
        while max_processes:
        # as long as there is something to do
        
            if (max_processes - max_threads) >= 0:
                
                run(max_threads)
                max_processes -= max_threads
            
            else:
            # play it safe    
                run(1)
                max_processes -= 1
        
        # undo the queue and give me back the list of 'dones'
        return([outbasket.get() for each in range(outbasket.qsize())])

    return(interfacer)

@parallel
def test(x):
    return(x**2)

print(test(args))

这段代码可能不够高效,但是可以提供一个思路。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接