多进程池:在使用apply_async的回调选项时调用辅助函数

4

apply_async函数在调用可迭代的函数和回调函数之间如何工作?

设置:我正在读取一个包含2000个文件的目录中的所有文件的某些行,有些文件有数百万行,有些只有几行。从每个文件中提取一些头/格式/日期数据以描述每个文件。这是在16 CPU机器上完成的,因此使用多进程是有意义的。

目前,预期结果被发送到列表(ahlala),以便我可以打印出来;稍后,这将被写入*.csv文件。这是我的代码的简化版本,最初基于this非常有帮助的帖子。

import multiprocessing as mp

def dirwalker(directory):
  ahlala = []

  # X() reads files and grabs lines, calls helper function to calculate
  # info, and returns stuff to the callback function
  def X(f): 
    fileinfo = Z(arr_of_lines) 
    return fileinfo 

  # Y() reads other types of files and does the same thing
  def Y(f): 
    fileinfo = Z(arr_of_lines)
    return fileinfo

  # results() is the callback function
  def results(r):
    ahlala.extend(r) # or .append, haven't yet decided

  # helper function
  def Z(arr):
    return fileinfo # to X() or Y()!

  for _,_,files in os.walk(directory):
    pool = mp.Pool(mp.cpu_count()
    for f in files:
      if (filetype(f) == filetypeX): 
        pool.apply_async(X, args=(f,), callback=results)
      elif (filetype(f) == filetypeY): 
        pool.apply_async(Y, args=(f,), callback=results)

  pool.close(); pool.join()
  return ahlala

请注意,如果我将所有的Z()辅助函数放入X()Y()results()中,代码是可以运行的,但这样做是否会重复或者可能比可能更慢?我知道回调函数在每次函数调用时都会被调用,但回调函数是什么时候被调用的呢?它是在pool.apply_async()完成进程的所有作业之后被调用的吗?如果这些辅助函数在第一个函数pool.apply_async()的范围内(在本例中为X())被调用,那么速度不应该更快吗?如果不是,我应该把辅助函数放在results()中吗?
其他相关想法:守护进程是为什么没有显示任何东西?我也非常困惑如何排队,以及这是否是问题。 这似乎是学习它的起点, 但使用apply_async时是否可以安全地忽略排队,或者只有在时间效率上才能注意到它的影响?

arr_of_lines 是从哪里来的?你将 f 传递给 XY,但是没有使用它,而是使用了 arr_of_lines。你能澄清一下那里发生了什么吗? - dano
嗨,是的!很多事情都在进行中。例如,传递给 X(f) 的 f 是一个 *.zip 文件,其中包含 5 个 *.csv 文件(这代表了几百个文件)。因此,我使用 zipfile.ZipFile 查看文件内容(该方法返回文件名列表),使用 for 循环来使用 csv.reader 读取每个 csv 文件,并传出前两行/行(标题和示例行)-- 这是 arr_of_lines - ehacinom
1个回答

9
您在这里提出了许多不同的问题,我会尽力涵盖所有内容:
传递给callback的函数将在主进程中执行(而不是工作进程),一旦工作进程返回其结果。它在Pool对象内部创建的一个线程中执行。该线程从result_queue中获取对象,用于获取所有工作进程的结果。线程从队列中取出结果后,执行callback。当您的回调函数执行时,无法从队列中拉取其他结果,因此回调快速完成非常重要。就您的示例而言,只要您通过apply_async进行的对XY 的任何一个调用完成,结果将由工作进程放入result_queue,然后结果处理线程将从result_queue中取出结果,并执行callback
其次,我怀疑您未看到任何代码运行结果的原因是因为您的所有工作函数调用都失败了。如果工作函数失败,则将永远不会执行callback。除非您尝试从调用apply_async返回的AsyncResult对象中获取结果,否则不会报告失败。但是,由于您没有保存这些对象中的任何一个,因此您永远不会知道发生了失败。如果我是您,在测试时尝试使用pool.apply,这样您就可以在发生错误时立即看到。
工作进程可能失败的原因(至少在您提供的示例代码中)是因为XY被定义为另一个函数内部的函数。multiprocessing通过在主进程中对它们进行pickle并在工作进程中对它们进行unpickle来向工作进程传递函数和对象。定义在其他函数内部的函数无法进行pickle,这意味着multiprocessing将无法成功在工作进程中对其进行unpickle。要解决此问题,请将两个函数定义在模块的顶层,而不是嵌套在dirwalker函数中。
您应该继续从XY中调用Z,而不是在results中执行。这样,Z可以在所有工作进程中同时运行,而不必在主进程中一次运行一个调用。请记住,您的callback函数应尽可能快地完成,以避免阻塞结果处理。在那里执行Z会使事情变慢。

这里有一些简单的示例代码,与您正在进行的工作类似,希望能让您了解代码应该是什么样子:

import multiprocessing as mp
import os

# X() reads files and grabs lines, calls helper function to calculate
# info, and returns stuff to the callback function
def X(f): 
    fileinfo = Z(f) 
    return fileinfo 

# Y() reads other types of files and does the same thing
def Y(f): 
    fileinfo = Z(f)
    return fileinfo

# helper function
def Z(arr):
    return arr + "zzz"

def dirwalker(directory):
    ahlala = []

    # results() is the callback function
    def results(r):
        ahlala.append(r) # or .append, haven't yet decided

    for _,_,files in os.walk(directory):
        pool = mp.Pool(mp.cpu_count())
        for f in files:
            if len(f) > 5: # Just an arbitrary thing to split up the list with
                pool.apply_async(X, args=(f,), callback=results)  # ,error_callback=handle_error # In Python 3, there's an error_callback you can use to handle errors. It's not available in Python 2.7 though :(
            else:
                pool.apply_async(Y, args=(f,), callback=results)

    pool.close()
    pool.join()
    return ahlala


if __name__ == "__main__":
    print(dirwalker("/usr/bin"))

输出:

['ftpzzz', 'findhyphzzz', 'gcc-nm-4.8zzz', 'google-chromezzz' ... # lots more here ]

编辑:

您可以使用multiprocessing.Manager类创建一个在父进程和子进程之间共享的字典对象:

pool = mp.Pool(mp.cpu_count())
m = multiprocessing.Manager()
helper_dict = m.dict()
for f in files:
    if len(f) > 5:
        pool.apply_async(X, args=(f, helper_dict), callback=results)
    else:
        pool.apply_async(Y, args=(f, helper_dict), callback=results)

接下来,让XY加上一个名为helper_dict(或者你想要的其他名称)的第二个参数,这样就可以了。

需要注意的是,这种方法通过创建一个包含普通字典的服务器进程,使得所有其他进程通过代理对象与该字典进行通信。因此,每次读取或写入字典时,都会进行进程间通信,这使得速度比真正的字典要慢很多。


每个问题在后面的段落中都得到了美妙的解答!:D 通过腌制,解释了为什么它可以在未并行化的代码中工作!遗憾的是,X()Y()被定义在里面是为了速度。助手程序Z()dirwalker内部创建了一个(假?)智能字典,它会记住是否之前已经看到过类似模式的文件(不考虑扩展名/文件类型),从而减少搜索并使用dateutil.parser()。我必须想办法两全其美...我试图避免使用全局变量,但在这个问题中实际上并不糟糕。:) 如果您有时间,有任何建议吗? - ehacinom
1
@rebecca 我编辑了我的答案来解决你的字典问题。你不能只使用普通的全局字典,因为 Z 被跨越许多不同的进程调用,每个进程都会得到自己的全局字典副本。 - dano

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接