在Python多进程池中为工作进程获取唯一ID

76
有没有一种方法可以为Python多进程池中的每个worker分配唯一的ID,以使由池中特定worker运行的作业能够知道正在运行它的worker是哪个?根据文档,Process具有name,但是该名称仅用于识别目的,没有语义。多个进程可能会被赋予相同的名称。对于我的特定用例,我想在四个GPU组上运行一堆作业,并且需要设置作业应在其中运行的GPU的设备号。由于作业长度不均,因此我想确保在前一个作业完成之前不会发生尝试在其上运行作业的冲突(因此排除了提前为工作单元分配ID的可能性)。

2
为什么不使用像uuid这样的随机值呢? - Luper Rouch
@LuperRouch - 你能详细说明你的意思吗? - JoshAdel
2
例如,process = Process(target=foo, name=uuid.uuid4().hex) 可以为您的进程提供唯一的名称。 - Luper Rouch
请注意,为工作人员获取唯一ID和为就业岗位获取唯一ID是不同的事情。真正想要为就业岗位获得唯一ID的读者可以只做 p.map(f, enumerate(jobs)),更改 f 的签名为 f(arg),并将 i, x = arg 拆分为 f 的第一行。 - Attila the Fun
6个回答

121
似乎你想要的很简单:multiprocessing.current_process()。例如:
import multiprocessing

def f(x):
    print multiprocessing.current_process()
    return x * x

p = multiprocessing.Pool()
print p.map(f, range(6))

输出:

$ python foo.py 
<Process(PoolWorker-1, started daemon)>
<Process(PoolWorker-2, started daemon)>
<Process(PoolWorker-3, started daemon)>
<Process(PoolWorker-1, started daemon)>
<Process(PoolWorker-2, started daemon)>
<Process(PoolWorker-4, started daemon)>
[0, 1, 4, 9, 16, 25]

这将返回进程对象本身,因此进程可以成为自己的标识。您还可以调用id以获得唯一的数字ID - 在cpython中,这是进程对象的内存地址,因此我认为不存在任何重叠的可能性。最后,您可以使用进程的identpid属性 - 但这只有在进程启动后才设置。

此外,从源代码来看,我认为自动生成的名称(如上述Process repr字符串中的第一个值所示)是唯一的。对于每个进程,multiprocessing维护一个itertools.counter对象,用于为它所生成的任何子进程生成_identity元组。因此,顶级进程产生具有单值ID的子进程,它们又生成具有双值ID的进程,以此类推。然后,如果未向Process构造函数传递名称,它将仅基于_identity自动生成名称,使用':'.join(...)。然后,Pool更改进程的名称,并使用replace,使自动生成的ID保持不变。

这一切的结果是,尽管两个Process的名称可能相同,因为在创建它们时可以分配相同的名称,但如果您不触及名称参数,则它们是唯一的。此外,理论上可以使用_identity作为唯一标识符;但我想他们之所以将该变量设置为私有,是有原因的!
以上内容的示例:
import multiprocessing

def f(x):
    created = multiprocessing.Process()
    current = multiprocessing.current_process()
    print 'running:', current.name, current._identity
    print 'created:', created.name, created._identity
    return x * x

p = multiprocessing.Pool()
print p.map(f, range(6))

输出:

$ python foo.py 
running: PoolWorker-1 (1,)
created: Process-1:1 (1, 1)
running: PoolWorker-2 (2,)
created: Process-2:1 (2, 1)
running: PoolWorker-3 (3,)
created: Process-3:1 (3, 1)
running: PoolWorker-1 (1,)
created: Process-1:2 (1, 2)
running: PoolWorker-2 (2,)
created: Process-2:2 (2, 2)
running: PoolWorker-4 (4,)
created: Process-4:1 (4, 1)
[0, 1, 4, 9, 16, 25]

id属性似乎已经不存在了,但是pidident存在(它们是相同的;pid只是ident的别名)。 - Attila the Fun

8
您可以使用 multiprocessing.Queue 存储这些id,然后在池进程的初始化中获取该 id。

优点:

  • 您不需要依赖内部实现。
  • 如果您的用例是管理资源/设备,那么您可以直接放入设备编号。这也将确保不会重复使用任何设备:如果您的进程池中有更多进程而没有足够的设备,则其他进程将在 queue.get() 上阻塞并且不执行任何工作(当我测试时,这不会阻止您的程序,或者至少没有)。

缺点:

  • 您需要额外的通信开销,并且生成池进程需要稍微多花一点时间:例如,在示例中没有 sleep(1) ,所有工作可能都由第一个进程执行,因为其他进程尚未初始化完毕。
  • 您需要全局变量(或者至少我不知道有什么办法绕过它)。

示例:

import multiprocessing
from time import sleep

def init(queue):
    global idx
    idx = queue.get()

def f(x):
    global idx
    process = multiprocessing.current_process()
    sleep(1)
    return (idx, process.pid, x * x)

ids = [0, 1, 2, 3]
manager = multiprocessing.Manager()
idQueue = manager.Queue()

for i in ids:
    idQueue.put(i)

p = multiprocessing.Pool(8, init, (idQueue,))
print(p.map(f, range(8)))

输出:

[(0, 8289, 0), (1, 8290, 1), (2, 8294, 4), (3, 8291, 9), (0, 8289, 16), (1, 8290, 25), (2, 8294, 36), (3, 8291, 49)]

请注意,尽管此池包含8个进程并且一个idx仅被一个进程使用,但只有4个不同的pid。

1
我使用线程完成了这个任务,并使用队列来处理作业管理。以下是基准版本。我的完整版本有很多try-catches(特别是在工作器中),以确保即使失败也会调用q.task_done()
from threading import Thread
from queue import Queue
import time
import random


def run(idx, *args):
    time.sleep(random.random() * 1)
    print idx, ':', args


def run_jobs(jobs, workers=1):
    q = Queue()
    def worker(idx):
        while True:
            args = q.get()
            run(idx, *args)
            q.task_done()

    for job in jobs:
        q.put(job)

    for i in range(0, workers):
        t = Thread(target=worker, args=[i])
        t.daemon = True
        t.start()

    q.join()


if __name__ == "__main__":
    run_jobs([('job', i) for i in range(0,10)], workers=5)

我不需要使用多进程(我的工作程序只是用于调用外部进程),但这可以扩展。多进程的API会有一些变化,以下是如何进行适应:

from multiprocessing import Process, Queue
from Queue import Empty
import time
import random

def run(idx, *args):
    time.sleep(random.random() * i)
    print idx, ':', args


def run_jobs(jobs, workers=1):
    q = Queue()
    def worker(idx):
        try:
            while True:
                args = q.get(timeout=1)
                run(idx, *args)
        except Empty:
            return

    for job in jobs:
        q.put(job)

    processes = []
    for i in range(0, workers):
        p = Process(target=worker, args=[i])
        p.daemon = True
        p.start()
        processes.append(p)

    for p in processes: 
        p.join()


if __name__ == "__main__":
    run_jobs([('job', i) for i in range(0,10)], workers=5)

两个版本都会输出类似以下的内容:
0 : ('job', 0)
1 : ('job', 2)
1 : ('job', 6)
3 : ('job', 3)
0 : ('job', 5)
1 : ('job', 7)
2 : ('job', 1)
4 : ('job', 4)
3 : ('job', 8)
0 : ('job', 9)

0

我成功地通过使用getattr获取函数句柄,然后使用包装器来打包和解包我想要传递给映射方法的任意数量参数,实现了将映射到类方法。在我的情况下,我正在从启动池的同一类中传递方法,但您也可以传递一个对象以映射到不同的类。

以下是代码:

import multiprocessing
from multiprocessing import Pool


def warp(args):
    func = args[0]
    frame = args[1]
    left_over = args[2:]
    func(frame, *left_over)


class MyClass:

    def __init__(self):
        self.my_flag = 5

    def exec_method(self, method, int_list, *args):
        obj = getattr(self, method.__name__)

        packed = list()
        for i in int_list:
            pack = list()
            pack.append(obj)
            pack.append(i)
            for arg in args:
                pack.append(arg)
            packed.append(pack)

        print("Start")
        pool = Pool(processes=multiprocessing.cpu_count())
        pool.map(warp, packed)
        print("End")

    def method1(self, my_str):
        print(self.my_flag, my_str)

    def method2(self, i, print_str, bool_flat):
        print(multiprocessing.current_process(), self.my_flag, i, print_str, str(bool_flat))


cls: MyClass = MyClass()
cls.my_flag = 58
cls.exec_method(cls.method2, [1, 5, 10, 20, 30], "this is a string", True)

这是输出结果:

Start
<ForkProcess(ForkPoolWorker-1, started daemon)> 58 1 this is a string True
<ForkProcess(ForkPoolWorker-2, started daemon)> 58 5 this is a string True
<ForkProcess(ForkPoolWorker-4, started daemon)> 58 20 this is a string True
<ForkProcess(ForkPoolWorker-5, started daemon)> 58 30 this is a string True
<ForkProcess(ForkPoolWorker-3, started daemon)> 58 10 this is a string True
End

0

我不确定如何在Pool中使用它,但打印Process会产生一些独特的输出:

x = Process(target=time.sleep, args=[20])
x.start()
print(x)  # <Process name='Process-5' pid=97121 parent=95732 started>

0

这里有另一种可能被考虑的方法:

import multiprocessing
import math

def worker(worker_id, pid_dict, data_dict, worker_dict, res_dict, nb_worker):
    current = multiprocessing.current_process()
    pid_dict[worker_id] = current.pid
    worker_dict[worker_id] = worker_id 
    data_len = len(data_dict)
    chunk_size = math.ceil(data_len / nb_worker)
    start_index = worker_id * chunk_size
    end_index = min(((worker_id + 1) * chunk_size - 1), data_len)
    res_dict[worker_id] = 0
    
    for i in range(start_index, end_index):
        res_dict[worker_id] = res_dict[worker_id] + data_dict[i]

if __name__ == "__main__":
    nb_worker = 7
    manager = multiprocessing.Manager()
    pid_dict = manager.dict()
    worker_dict = manager.dict()
    data_dict = manager.dict()
    res_dict = manager.dict()
        
    for i in range(100000):
        data_dict[i] = i
    
    jobs = []
    
    for i in range(nb_worker):
        p = multiprocessing.Process(target = worker, args = (i, pid_dict, data_dict, worker_dict, res_dict, nb_worker))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
        
    print("Pid")
    print(pid_dict.values())
    print(" ")
    print("Worker id")
    print(worker_dict.values())
    print(" ")
    print("Sum data by worker")
    print(res_dict.values())
    print(" ")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接