不同函数的多进程池

25

大多数Multiprocess Worker Pools的例子都是在不同的进程中执行单个函数,例如:

def foo(args):
   pass

if __name__ == '__main__':
   pool = multiprocessing.Pool(processes=30)
   res=pool.map_async(foo,args)

有没有办法在进程池内处理两个不同且独立的函数?这样,您可以为foo()分配15个进程,为bar()分配15个进程,或者进程池仅限于单个函数?或者您必须手动为不同的函数创建不同的进程?

 p = Process(target=foo, args=(whatever,))
 q = Process(target=bar, args=(whatever,))
 q.start()
 p.start()

并忘记工作池?

6个回答

29

要传递不同的函数,你可以简单地多次调用map_async

以下是一个例子来说明这一点,

from multiprocessing import Pool
from time import sleep

def square(x):
    return x * x

def cube(y):
    return y * y * y

pool = Pool(processes=20)

result_squares = pool.map_async(f, range(10))
result_cubes = pool.map_async(g, range(10))

结果将会是:

>>> print result_squares.get(timeout=1)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

>>> print result_cubes.get(timeout=1)
[0, 1, 8, 27, 64, 125, 216, 343, 512, 729]

2
它们会并行执行还是“依次”执行? - dorvak
4
map_async 立即返回。只要池中有足够的空闲进程,新任务就会被运行而无需等待。在上面的示例中,它们将并行运行。@mad_scientist - Ocaj Nires
1
谢谢!但是没有办法分配特定数量的工作进程,我猜是吗? - dorvak
2
multiprocessing Pool API不提供在同一池中分配特定数量的工作进程的机制。如果您确实需要每个任务具有特定数量的工作进程,请创建不同的池。尽管建议仅使用单个池。我想这是有道理的,因为池应该在您不必担心的情况下透明地为您管理它。 - Ocaj Nires
谢谢你的回答,你确定在一个接一个地添加 map_async() 会并行运行吗?我实际上已经尝试过这个方法,正如 @Sam 的回答所示,它们似乎是按顺序运行的。 - Zhubarb

12

您可以使用map或一些lambda函数(编辑:实际上不能使用lambda函数)。您可以使用简单的map函数:

您可以使用map或某些lambda函数(编辑:实际上不能使用lambda函数)。您可以使用简单的map函数:

def smap(f, *args):
    return f(*args)

pool = multiprocessing.Pool(processes=30)
res=pool.map(smap, function_list, args_list1, args_list2,...)

正常的map函数需要将可迭代对象作为输入,这样很不方便。


2
这应该被接受为正确答案,因为已接受的答案在准并行模式下运行(带有糟糕的计划器)。 - ARA1307
1
"pool.map" 不需要那么多的参数。这段代码可以在 Python 内置的 "map" 中运行,但似乎不能直接在多进程上下文中工作。 - Attila the Fun

9
这里是@Rayamon分享的想法的一个可行示例:
import functools

from multiprocessing import Pool


def a(param1, param2, param3):
    return param1 + param2 + param3


def b(param1, param2):
    return param1 + param2


def smap(f):
    return f()


func1 = functools.partial(a, 1, 2, 3)
func2 = functools.partial(b, 1, 2)

pool = Pool(processes=2)
res = pool.map(smap, [func1, func2])
pool.close()
pool.join()
print(res)

如何将值列表作为参数传递并在线程中单独运行。对于单个函数,它可以正常工作,但对于多个函数则不行。 - Madan Raj

6
它们将不会并行运行。 请看以下代码:
def updater1(q,i):    
    print "UPDATER 1:", i
    return

def updater2(q,i):    
    print "UPDATER2:", i
    return

if __name__=='__main__':
    a = range(10)
    b=["abc","def","ghi","jkl","mno","pqr","vas","dqfq","grea","qfwqa","qwfsa","qdqs"]


    pool = multiprocessing.Pool()

    func1 = partial(updater1,q)
    func2 = partial(updater2,q)
    pool.map_async(func1, a)
    pool.map_async(func2, b)

    pool.close()
    pool.join()

以上代码输出如下结果:
UPDATER 1: 1
UPDATER 1: 0
UPDATER 1: 2
UPDATER 1: 3
UPDATER 1: 4
UPDATER 1: 5
UPDATER 1: 6
UPDATER 1: 7
UPDATER 1: 8
UPDATER 1: 9
UPDATER2: abc
UPDATER2: def
UPDATER2: ghi
UPDATER2: jkl
UPDATER2: mno
UPDATER2: pqr
UPDATER2: vas
UPDATER2: dqfq
UPDATER2: grea
UPDATER2: qfwqa
UPDATER2: qwfsa
UPDATER2: qdqs

这个答案对于Python3来说要么过时,要么让事情变得混乱。你的代码无法运行,因为它包含一个神秘的'q'变量。此外,这并不能证明代码在串行中运行-在updater1函数中添加一个sleep实际上会使输出交错,从而证明两个任务列表之间没有隐式的阻塞。查看_map_async的cpython源代码告诉我,这两个列表的结构甚至不会被保留-所有的任务都被放入一个任务队列中。https://github.com/python/cpython/blob/main/Lib/multiprocessing/pool.py - julaine
这个答案要么过时了针对Python3,要么让事情变得混乱。你的代码无法运行,因为它包含一个神秘的'q'变量。此外,这并不能证明代码是串行运行的 - 在updater1函数中添加一个sleep实际上会使输出交错,从而证明两个任务列表之间没有隐式的阻塞。查看_cpython-source中的_map_async告诉我,这两个列表的结构甚至都不会被保留 - 所有任务都被放入一个任务队列中。https://github.com/python/cpython/blob/main/Lib/multiprocessing/pool.py - undefined

4

一个池中多个函数

以下示例展示如何在一个池中运行三个函数inc, decadd

from multiprocessing import Pool
import functools

# -------------------------------------

def inc(x):
    return x + 1

def dec(x):
    return x - 1

def add(x, y):
    return x + y

# -------------------------------------

def smap(f):
    return f()

def main():
    f_inc = functools.partial(inc, 4)
    f_dec = functools.partial(dec, 2)
    f_add = functools.partial(add, 3, 4)
    with Pool() as pool:
        res = pool.map(smap, [f_inc, f_dec, f_add])
        print(res)

# -------------------------------------

if __name__ == '__main__':
    main()

我们有三个函数,在池中独立运行。在执行之前,我们使用functools.partial准备函数及其参数。
来源:https://zetcode.com/python/multiprocessing/

1

为了进一步解释上面的答案,以下是一个例子:

  1. 使用 Pool 并行运行具有多个输入的单个函数(平方函数),有趣的是“5 981 25”行上的混淆操作
  2. 使用 Pool 运行具有不同输入(args 和 kwargs)的多个函数,并收集它们的结果(pf1、pf2、pf3 函数)
import datetime
import multiprocessing
import time
import random

from multiprocessing import Pool

def square(x):
    # calculate the square of the value of x
    print(x, x*x)
    return x*x

def pf1(*args, **kwargs):
    sleep_time = random.randint(3, 6)
    print("Process : %s\tFunction : %s\tArgs: %s\tsleeping for %d\tTime : %s\n" % (multiprocessing.current_process().name, "pf1", args, sleep_time, datetime.datetime.now()))
    print("Keyword Args from pf1: %s" % kwargs)
    time.sleep(sleep_time)
    print(multiprocessing.current_process().name, "\tpf1 done at %s\n" % datetime.datetime.now())
    return (sum(*args), kwargs)

def pf2(*args):
    sleep_time = random.randint(7, 10)
    print("Process : %s\tFunction : %s\tArgs: %s\tsleeping for %d\tTime : %s\n" % (multiprocessing.current_process().name, "pf2", args, sleep_time, datetime.datetime.now()))
    time.sleep(sleep_time)
    print(multiprocessing.current_process().name, "\tpf2 done at %s\n" % datetime.datetime.now())
    return sum(*args)

def pf3(*args):
    sleep_time = random.randint(0, 3)
    print("Process : %s\tFunction : %s\tArgs: %s\tsleeping for %d\tTime : %s\n" % (multiprocessing.current_process().name, "pf3", args, sleep_time, datetime.datetime.now()))
    time.sleep(sleep_time)
    print(multiprocessing.current_process().name, "\tpf3 done at %s\n" % datetime.datetime.now())
    return sum(*args)

def smap(f, *arg):
    if len(arg) == 2:
        args, kwargs = arg
        return f(list(args), **kwargs)
    elif len(arg) == 1:
        args = arg
        return f(*args)


if __name__ == '__main__':

    # Define the dataset
    dataset = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]

    # Output the dataset
    print ('Dataset: ' + str(dataset))

    # Run this with a pool of 5 agents having a chunksize of 3 until finished
    agents = 5
    chunksize = 3
    with Pool(processes=agents) as pool:
        result = pool.map(square, dataset)
    print("Result of Squares : %s\n\n" % result)
    with Pool(processes=3) as pool:
        result = pool.starmap(smap, [(pf1, [1,2,3], {'a':123, 'b':456}), (pf2, [11,22,33]), (pf3, [111,222,333])])

    # Output the result
    print ('Result: %s ' % result)


Output:
*******

Dataset: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
1 1
2 4
3 9
4 16
6 36
7 49
8 64
59 81
 25
10 100
11 121
12 144
13 169
14 196
Result of Squares : [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196]


Process : ForkPoolWorker-6  Function : pf1  Args: ([1, 2, 3],)  sleeping for 3  Time : 2020-07-20 00:51:56.477299

Keyword Args from pf1: {'a': 123, 'b': 456}
Process : ForkPoolWorker-7  Function : pf2  Args: ([11, 22, 33],)   sleeping for 8  Time : 2020-07-20 00:51:56.477371

Process : ForkPoolWorker-8  Function : pf3  Args: ([111, 222, 333],)    sleeping for 1  Time : 2020-07-20 00:51:56.477918

ForkPoolWorker-8    pf3 done at 2020-07-20 00:51:57.478808

ForkPoolWorker-6    pf1 done at 2020-07-20 00:51:59.478877

ForkPoolWorker-7    pf2 done at 2020-07-20 00:52:04.478016

Result: [(6, {'a': 123, 'b': 456}), 66, 666] 

Process finished with exit code 0


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接