Python多进程 - 进程数量

3

我正在执行以下代码,它运行良好,但它没有生成不同的进程,有时所有进程在同一个进程中运行,有时两个进程在同一个进程中运行。 我使用了一台4个CPU的机器。这个代码出了什么问题吗?

def f(values):
    print(multiprocessing.current_process())
    return values

def main():
    p = Pool(4) #number of processes = number of CPUs
    keys, values= zip(*data.items()) #ordered keys and values
    processed_values= p.map( f, values )
    result= dict( zip(keys, processed_values ) ) 
    p.close() # no more tasks
    p.join()  # wrap up current tasks

结果是

<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>

有时候是这样的情况:

<SpawnProcess(SpawnPoolWorker-3, started daemon)>
<SpawnProcess(SpawnPoolWorker-2, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-3, started daemon)>

有时候,
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-4, started daemon)>
<SpawnProcess(SpawnPoolWorker-2, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>

我的问题是,它根据什么依据将函数分配给工作进程?我的编码方式是基于字典中键的数量来决定进程数(假设我的数据键数始终少于我的CPU)。我的代码将从主代码开始读取文件,并使用单个进程将其制成字典,然后应将其分支到许多并发进程并等待它们处理数据(我正在使用pool.map实现),然后一旦它获得子进程的结果,就开始处理它们。我如何实现父进程等待子进程的步骤?
1个回答

5

你的代码没有问题。只是你的工作项非常快 - 如此之快,以至于同一个工作进程可以运行函数、返回结果,然后赢得竞争并消费 multiprocessing.Pool 使用来分发工作的内部队列中的下一个任务。当你调用 map 时,工作项被分成批次并放入一个 Queue 中。以下是 pool.map 的一部分实现,它将可迭代对象中的项分块并将它们放入队列中:

    task_batches = Pool._get_tasks(func, iterable, chunksize)
    result = MapResult(self._cache, chunksize, len(iterable), callback)
    self._taskqueue.put((((result._job, i, mapstar, (x,), {}) 
                          for i, x in enumerate(task_batches)), None))

每个工作进程都运行一个函数,其中包含一个无限循环,该循环从该队列中消耗项目*:
while maxtasks is None or (maxtasks and completed < maxtasks):
    try:
        task = get()  # Pulls an item off the taskqueue
    except (EOFError, IOError):
        debug('worker got EOFError or IOError -- exiting')
        break

    if task is None:
        debug('worker got sentinel -- exiting')
        break

    job, i, func, args, kwds = task
    try:
        result = (True, func(*args, **kwds))  # Runs the function you passed to map
    except Exception, e:
        result = (False, e)
    try:
        put((job, i, result))  # Sends the result back to the parent
    except Exception as e:
        wrapped = MaybeEncodingError(e, result[1])
        debug("Possible encoding error while sending result: %s" % (
            wrapped))

很可能同一个工作人员只是偶然地能够消耗一项,运行func,然后消耗下一项。这有点奇怪-我无法在我的机器上复制您的示例代码运行相同的代码-但如果同一个工作人员从队列中获取了四个项目中的两个,则非常正常。
如果使您的工作人员函数变得更长,通过插入调用time.sleep,则应始终看到均匀分布:
def f(values):
    print(multiprocessing.current_process())
    time.sleep(1)
    return values

* 这其实并不完全正确 - 主进程中有一个线程从 taskqueue 中消费任务,并将取出的内容放入另一个 Queue 中,子进程从那个队列中消费任务。


1
@Jeeva pool.map 不会在子进程处理完你传入的可迭代对象(例如你的示例中的 data)之前返回。你的示例中的 processed_values 列表将包含每次执行 f 的结果。因此,你不需要做任何比你已经做的更多的事情。 - dano
问题在于 - task_batches = Pool._get_tasks(func, iterable, chunksize),我希望我的函数能够应用于字典中的每个键。它不是一个确定的块大小。因此,基本上 n_process = 字典中的 n_keys。相同的函数并行运行 n_key 次。 - ds_user
@Jeeva 在你的例子中,块处理仅用于在有大型可迭代对象时加速进程间通信。在这种情况下,块大小将为1,因此每个任务都将被单独排队。 - dano
哎呀。为什么是1?应该是4吧?因为我有四个键。或者这是怎么决定的? - ds_user
让我们在聊天中继续这个讨论。点击此处进入聊天室 - ds_user
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接