如何修复concurrent.futures ProcessPoolExecutor中出现的BrokenProcessPool错误

8
使用concurrent.futures.ProcessPoolExecutor,我尝试并行运行第一段代码来执行函数"Calculate_Forex_Data_Derivatives(data,gride_spacing)"。当调用结果executor_list[i].result()时,我会收到"BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending."的错误信息。我已经尝试将函数的多个调用发送到处理池中以及仅发送一个调用到处理池中来运行代码,但两种方式都导致了错误。
我还使用了一段更简单的代码结构(第二段提供的代码),使用相同类型的输入调用函数,它可以正常工作。我唯一看到的两段代码之间的不同之处是第一段代码从'findiff'模块调用函数"FinDiff(axis,grid_spacing,derivative_order)"。这个函数连同"Calculate_Forex_Data_Derivatives(data,gride_spacing)"在串行运行时工作得很好。
我使用Anaconda环境、Spyder编辑器和Windows操作系统。
任何帮助将不胜感激。
#code that returns "BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending."

import pandas as pd
import numpy as np
from findiff import FinDiff
import multiprocessing
import concurrent.futures

def Calculate_Forex_Data_Derivatives(forex_data,dt):  #function to run in parallel
    try:
        dClose_dt = FinDiff(0,dt,1)(forex_data)[-1]
    except IndexError:
        dClose_dt = np.nan

    try:   
        d2Close_dt2 = FinDiff(0,dt,2)(forex_data)[-1]
    except IndexError:
        d2Close_dt2 = np.nan

    try:
        d3Close_dt3 = FinDiff(0,dt,3)(forex_data)[-1]
    except IndexError:
        d3Close_dt3 = np.nan

    return dClose_dt, d2Close_dt2, d3Close_dt3

#input for function
#forex_data is pandas dataframe, forex_data['Close'].values is numpy array
#dt is numpy array
#input_1 and input_2 are each a list of numpy arrays

input_1 = []
input_2 = []
for forex_data_index,data_point in enumerate(forex_data['Close'].values[:1]):
    input_1.append(forex_data['Close'].values[:forex_data_index+1])
    input_2.append(dt[:forex_data_index+1])


def multi_processing():
    executors_list = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
        for index in range(len(input_1)):
            executors_list.append(executor.submit(Calculate_Forex_Data_Derivatives,input_1[index],input_2[index]))

    return executors_list

if __name__ == '__main__':
    print('calculating derivatives')
    executors_list = multi_processing()

for output in executors_list
    print(output.result()) #returns "BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending."


##############################################################


#simple example that runs fine

def function(x,y):  #function to run in parallel
    try:
        asdf
    except NameError:
        a = (x*y)[0]
        b = (x+y)[0]

    return  a,b

x=[np.array([0,1,2]),np.array([3,4,5])]    #function inputs, list of numpy arrays
y=[np.array([6,7,8]),np.array([9,10,11])]

def multi_processing():    
    executors_list = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
        for index,_ in enumerate(x):
            executors_list.append(executor.submit(function,x[index],y[index]))

    return executors_list

if __name__ == '__main__':
    executors_list = multi_processing()

for output in executors_list:   #prints as expected
    print(output.result())      #(0, 6)
                                #(27, 12)
2个回答

20

我知道三种破坏ProcessPoolExecutor 的管道的典型方法:

操作系统杀死/终止

你的系统遇到限制,最有可能是内存,并开始终止进程。在Windows上,由于fork会克隆内存内容,因此使用大型DataFrame时这种情况并不罕见。

如何识别

  • 检查任务管理器中的内存消耗情况。
  • 除非你的DataFrame占用了一半以上的内存,否则它应该会消失,使用max_workers=1可以避免这种情况,但这并不是唯一的情况。

工作程序的自我终止

子进程的Python实例由于某些错误而终止,但不会引发适当的异常。一个例子就是在导入的C模块中出现segfault。

如何识别

如果没有使用PPE,你的代码可以正常运行,那么我能想到的唯一情况是一些模块不是多进程安全的。它也有可能会在使用max_workers=1时消失。也可以尝试在创建工作程序后立即手动调用函数(在调用executor.submit的for循环之后的一行),从而在主进程中引发错误。 否则很难识别,但我认为这是最不可能的情况。

PPE代码中的异常

处理通信的代码(即子进程端)可能会崩溃,导致出现适当的异常,但遗憾的是无法将其传递给主进程。

如何识别

由于代码(希望)经过了充分测试,主要嫌疑人在于返回数据。它必须被pickle并通过socket发送回去 - 这两个步骤都可能会崩溃。所以你必须检查:

  • 返回数据是否可被pickled?
  • 被pickled的对象是否足够小以便被发送(约为2GB)?

因此,你可以尝试返回一些简单的虚拟数据,或者显式地检查这两个条件:

    if len(pickle.dumps((dClose_dt, d2Close_dt2, d3Close_dt3))) > 2 * 10 ** 9: 
        raise RuntimeError('return data can not be sent!')

在Python 3.7中,该问题已得到修复,并且会返回异常。


感谢您的输入。我尝试了您建议的所有诊断方法。我尝试使用一个核心运行,并且只输入具有一个元素的numpy数组,因此我知道这不是传递太多数据的问题。我仍然认为这是与“findiff”模块有关的问题,因为导入第三方模块确实是我的错误代码和简单示例之间唯一不同的事情。我尝试在代码和函数中的不同位置导入模块,以及在不同位置定义使用该模块的函数。我被卡住了,可能会开始寻找其他路线。 - ZachV
我一直在使用WSL,但是不断出现问题。我知道我的内存使用率非常高(70%~),但从未想过会变得如此之大以至于导致这种情况。 - Mark

6
我在官方文档中找到了这一段:
“主”模块必须能够被工作子进程导入。 这意味着ProcessPoolExecutor将无法在交互式解释器中工作。从提交给ProcessPoolExecutor的可调用项中调用Executor或Future方法会导致死锁。
您尝试过吗?以下对我有效:
if __name__ == '__main__':
     executors_list = multi_processing()
     for output in executors_list:
         print(output.result())

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接