Python多进程 - 追踪池映射操作的进程

11
我有一个函数,它执行一些模拟并以字符串格式返回数组。
我想为不同的输入参数值运行模拟(函数),超过10000个可能的输入值,并将结果写入单个文件。
我正在使用多处理,具体来说是pool.map函数来并行运行模拟。
由于运行模拟函数10000次的整个过程需要很长时间,因此我真的希望跟踪整个操作的过程。
我认为我的当前代码的问题在于,pool.map在这些操作期间没有任何进程跟踪地运行函数10000次。一旦并行处理完成运行10000次模拟(可能需要数小时到数天),然后我保持跟踪当10000个模拟结果保存到文件时..所以这实际上并没有跟踪pool.map操作的处理过程。
是否有一种简单的方法可以修复我的代码,以允许进程跟踪?
def simFunction(input):
    # Does some simulation and outputs simResult
    return str(simResult)

# Parallel processing

inputs = np.arange(0,10000,1)

if __name__ == "__main__":
    numCores = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes = numCores)
    t = pool.map(simFunction, inputs) 
    with open('results.txt','w') as out:
        print("Starting to simulate " + str(len(inputs)) + " input values...")
        counter = 0
        for i in t:
            out.write(i + '\n')
            counter = counter + 1
            if counter%100==0:
                print(str(counter) + " of " + str(len(inputs)) + " input values simulated")
    print('Finished!!!!')
3个回答

14

请注意,我使用的是 pathos.multiprocessing而不是 multiprocessing 这只是 multiprocessing 的一个分支,它使您能够使用多个输入执行 map 函数,具有更好的序列化,并允许您在任何地方执行 map 调用(而不仅仅是在 __main__ 中)。当然,您也可以使用 multiprocessing 来完成以下操作,但代码可能会略有不同。

如果您使用迭代的 map 函数,跟踪进度就非常容易。

from pathos.multiprocessing import ProcessingPool as Pool
def simFunction(x,y):
  import time
  time.sleep(2)
  return x**2 + y
 
x,y = range(100),range(-100,100,2)
res = Pool().imap(simFunction, x,y)
with open('results.txt', 'w') as out:
  for i in x:
    out.write("%s\n" % res.next())
    if i%10 is 0:
      print "%s of %s simulated" % (i, len(x))
0 of 100 simulated
10 of 100 simulated
20 of 100 simulated
30 of 100 simulated
40 of 100 simulated
50 of 100 simulated
60 of 100 simulated
70 of 100 simulated
80 of 100 simulated
90 of 100 simulated
或者,您可以使用异步的 map。这里我会稍微做点不同的事情,以使它更加丰富多彩。
import time
res = Pool().amap(simFunction, x,y)
while not res.ready():
  print "waiting..."
  time.sleep(5)
 
waiting...
waiting...
waiting...
waiting...
res.get()
[-100, -97, -92, -85, -76, -65, -52, -37, -20, -1, 20, 43, 68, 95, 124, 155, 188, 223, 260, 299, 340, 383, 428, 475, 524, 575, 628, 683, 740, 799, 860, 923, 988, 1055, 1124, 1195, 1268, 1343, 1420, 1499, 1580, 1663, 1748, 1835, 1924, 2015, 2108, 2203, 2300, 2399, 2500, 2603, 2708, 2815, 2924, 3035, 3148, 3263, 3380, 3499, 3620, 3743, 3868, 3995, 4124, 4255, 4388, 4523, 4660, 4799, 4940, 5083, 5228, 5375, 5524, 5675, 5828, 5983, 6140, 6299, 6460, 6623, 6788, 6955, 7124, 7295, 7468, 7643, 7820, 7999, 8180, 8363, 8548, 8735, 8924, 9115, 9308, 9503, 9700, 9899]

使用迭代或异步的 map 都可以让你编写任何你想要进行更好进程跟踪的代码。例如,给每个作业传递一个唯一的“id”,并观察哪些作业返回,或让每个作业返回其进程id。有很多跟踪进度和进程的方法...但以上应该能给你一个起点。

您可以在这里获取 pathos


5
我认为你需要的是一个日志文件。
我建议你使用Python标准库中的logging模块。但不幸的是,logging在多进程环境下是不安全的。因此,在应用程序中不能直接使用它。
所以,你需要使用一个多进程安全的日志处理程序或者使用队列或锁来实现自己的日志处理程序,同时使用logging模块。
关于这个问题在Stackoverflow上有很多讨论,比如:How should I log while using multiprocessing in Python? 如果大部分CPU负载都在仿真函数中,并且你不会使用日志轮换,则可以使用类似于以下代码的简单锁机制:
import multiprocessing
import logging

from random import random
import time


logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s %(process)s %(levelname)s %(message)s',
    filename='results.log',
    filemode='a'
)


def simulation(a):
    # logging
    with multiprocessing.Lock():
        logging.debug("Simulating with %s" % a)

    # simulation
    time.sleep(random())
    result = a*2

    # logging
    with multiprocessing.Lock():
        logging.debug("Finished simulation with %s. Result is %s" % (a, result))

    return result

if __name__ == '__main__':

    logging.debug("Starting the simulation")
    inputs = [x for x in xrange(100)]
    num_cores = multiprocessing.cpu_count()
    print "num_cores: %d" % num_cores
    pool = multiprocessing.Pool(processes=num_cores)
    t = pool.map(simulation, inputs)
    logging.debug("The simulation has ended")

您可以在运行时使用“tail -f”命令跟踪日志文件。以下是您应该看到的内容:
2015-02-08 18:10:00,616 3957 DEBUG Starting the simulation
2015-02-08 18:10:00,819 3961 DEBUG Simulating with 12
2015-02-08 18:10:00,861 3965 DEBUG Simulating with 28
2015-02-08 18:10:00,843 3960 DEBUG Simulating with 20
2015-02-08 18:10:00,832 3959 DEBUG Simulating with 16
2015-02-08 18:10:00,812 3958 DEBUG Simulating with 8
2015-02-08 18:10:00,798 3963 DEBUG Simulating with 4
2015-02-08 18:10:00,855 3964 DEBUG Simulating with 24
2015-02-08 18:10:00,781 3962 DEBUG Simulating with 0
2015-02-08 18:10:00,981 3961 DEBUG Finished simulation with 12. Result is 24
2015-02-08 18:10:00,981 3961 DEBUG Simulating with 13
2015-02-08 18:10:00,991 3958 DEBUG Finished simulation with 8. Result is 16
2015-02-08 18:10:00,991 3958 DEBUG Simulating with 9
2015-02-08 18:10:01,130 3964 DEBUG Finished simulation with 24. Result is 48
2015-02-08 18:10:01,131 3964 DEBUG Simulating with 25
2015-02-08 18:10:01,134 3964 DEBUG Finished simulation with 25. Result is 50
2015-02-08 18:10:01,134 3964 DEBUG Simulating with 26
2015-02-08 18:10:01,315 3961 DEBUG Finished simulation with 13. Result is 26
2015-02-08 18:10:01,316 3961 DEBUG Simulating with 14
2015-02-08 18:10:01,391 3961 DEBUG Finished simulation with 14. Result is 28
2015-02-08 18:10:01,391 3961 DEBUG Simulating with 15
2015-02-08 18:10:01,392 3963 DEBUG Finished simulation with 4. Result is 8
2015-02-08 18:10:01,393 3963 DEBUG Simulating with 5

尝试在Windows和Linux上运行。
希望这可以帮助。

multiprocessing.get_logger() 返回受锁保护的功能有限的记录器,详情见 https://docs.python.org/2/library/multiprocessing.html#logging - Dr. Jan-Philip Gehrcke
是的,但这是模块记录器...所以尽管您可以使用它,但您的日志将与模块级别消息混合:尝试一下,您会看到像这样的消息:2015-02-08 23:47:10,954 9288 DEBUG created semlock with handle 448。 - Juan Fco. Roco
哦,你说得对,我从来没有使用过它,文档也翻阅地太快了。 - Dr. Jan-Philip Gehrcke

4

没有“简单的解决方案”。map就是为了将实现细节隐藏起来。而在这种情况下,你需要详细的信息。也就是说,事情变得有点复杂,这是定义上的。你需要改变通信范例。有许多方法可以做到这一点。

其中一种方法是:创建一个队列来收集结果,让你的工作线程将结果放入该队列中。然后,在监控线程或进程中,您可以查看队列,并在结果到达时消耗它们。在消费过程中,您可以分析它们并生成日志输出。这可能是跟踪进度最通用的方法:您可以实时以任何方式响应传入的结果。

更简单的方法可能是略微修改您的工作函数,并在其中生成日志输出。通过使用外部工具(例如grepwc)仔细分析日志输出,您可以找到非常简单的跟踪手段。


1
谢谢。你能提供一些简单的例子吗? - user32147

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接