Python的ProcessPoolExecutor内存问题

3
这是在Linux上,使用Python 3.8。我使用ProcessPoolExecutor来加速处理一系列大型数据框,但由于它们在每个进程中都被复制,导致内存不足。我该如何解决这个问题? 我的代码如下:
def some_func(df):
   # do some work on a single pandas DataFrame
   return single_df # returns a single pandas DataFrame 

# dfs is a list of 100 dataframes
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
     dfs = list(executor.map(some_func, dfs) # here entire dfs gets copied 4 times?
     executor.shutdown(wait=True)

我希望尽量减少不必要的数据复制,即最小化我的内存占用。有什么好的解决方案吗?


如果您可以转而使用单独的numpy数组而不是整个数据帧,那么您可以使用multiprocessing.shared_memory创建一个共享内存缓冲区,多个进程可以同时访问它。由于有大量的数组(100个数据帧*每个数据帧的列数),您可能需要创建一些管理代码来跟踪它们... - Aaron
1个回答

3
似乎您的内存问题与需要足够的存储空间来容纳双倍或200个数据框有关,假设some_func确实隐含地返回None。也就是说,您的主进程正在存储最初的100个数据框,然后将这些数据框复制到任务队列中。但是,当数据框从任务队列中移除时,它将在池进程的地址空间中暂时占用存储空间,直到该进程完成并从池中取下下一个数据框。这会导致内存利用率随着处理池从任务队列中耗尽任务而降低,直到我们回到起点(或者更多)。但是,如果这些数据框可以比您的工作函数some_func处理数据框的速度更快地添加到队列中,则高水位线将约为200个数据框。如果some_func实际上返回数据框,则内存使用量不会随着任务完成而减少。
最简单的方法是尝试不一次提交所有100个数据框到池中,而是将列表分成4个块(池大小),这样您的内存利用率的高水位线应该约为104个数据框,同样假设some_func没有返回数据框(例如,只是将其写出)。
def some_func(df):
   # do some work on a single pandas DataFrame
   pass


# dfs is a list of 100 dataframes
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
    start = 0
    cnt = len(dfs)
    while start < cnt:
        # Do chunks of 4:
        list(executor.map(some_func, dfs[start:start+4]))
        start += 4
#executor.shutdown(wait=True) is done implicitly at end of the above block

更新

由于我们现在知道some_func实际上返回的是一个数据帧,我认为你完成后不需要原始数据帧列表。如果这个假设是错误的,那么你需要存储200个数据帧,对吗?

因此,现在我们仍然将任务分成4块,并用结果数据框替换输入数据框:

def some_func(df):
   # do some work on a single pandas DataFrame
   return single_df # returns a single pandas DataFrame

# dfs is a list of 100 dataframes
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
    start = 0
    cnt = len(dfs)
    while start < cnt:
        # Do chunks of 4:
        dfs[start:start+4] = list(executor.map(some_func, dfs[start:start+4]))
        start += 4
    #executor.shutdown(wait=True) is done implicitly at end of the above block

除了对这行代码的担忧之外,其他都很合理: dfs = list(executor.map(some_func, dfs[start:start+4]))。这将使用从此调用返回的 4 个列表覆盖大小为 100 的输入 dfs 列表。最干净的避免这个问题的方法是什么,同时又要确保大列表 dfs 不会被复制到子进程中呢? 这似乎有点像进退两难的局面。 - user623949
我的错误。只要 some_func 没有返回任何有用的内容,就没有理由除了将其转换为列表来迭代 map 的返回值。不要将该值重新赋回给列表。 - Booboo
请查看更新后的答案。 - Booboo
是的,你理解得没错,谢谢。但我关心的是这一行代码: dfs[start:start+4] = list(executor.map(some_func, dfs[start:start+4])) 我们确定进程只会接收到 dfs 列表的那个切片的副本,而不是整个列表吗?因为我认为所有变量(整个上下文)都会被复制到每个进程中,这是我的担忧。 - user623949
你的代码中没有展示这100个数据框是如何创建的。它们真的需要一次性全部创建然后传递吗?在结果返回后,它们会被用来做什么?你是否需要同时得到这100个结果?可能有其他创造性的方法来解决内存问题。你不必回答这些问题——它们只是让你思考的东西。但有时候当人们提供的可重现示例太少时,他们就无法得到最好的答案——甚至是任何答案。 - Booboo
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接