我正在执行以下代码,它运行良好,但它没有生成不同的进程,有时所有进程在同一个进程中运行,有时两个进程在同一个进程中运行。 我使用了一台4个CPU的机器。这个代码出了什么问题吗?
def f(values):
print(multiprocessing.current_process())
return values
def main():
p = Pool(4) #number of processes = number of CPUs
keys, values= zip(*data.items()) #ordered keys and values
processed_values= p.map( f, values )
result= dict( zip(keys, processed_values ) )
p.close() # no more tasks
p.join() # wrap up current tasks
结果是
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
有时候是这样的情况:
<SpawnProcess(SpawnPoolWorker-3, started daemon)>
<SpawnProcess(SpawnPoolWorker-2, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-3, started daemon)>
有时候,
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-4, started daemon)>
<SpawnProcess(SpawnPoolWorker-2, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
我的问题是,它根据什么依据将函数分配给工作进程?我的编码方式是基于字典中键的数量来决定进程数(假设我的数据键数始终少于我的CPU)。我的代码将从主代码开始读取文件,并使用单个进程将其制成字典,然后应将其分支到许多并发进程并等待它们处理数据(我正在使用pool.map实现),然后一旦它获得子进程的结果,就开始处理它们。我如何实现父进程等待子进程的步骤?
pool.map
不会在子进程处理完你传入的可迭代对象(例如你的示例中的data
)之前返回。你的示例中的processed_values
列表将包含每次执行f
的结果。因此,你不需要做任何比你已经做的更多的事情。 - dano1
,因此每个任务都将被单独排队。 - dano