apply_async
函数在调用可迭代的函数和回调函数之间如何工作?
设置:我正在读取一个包含2000个文件的目录中的所有文件的某些行,有些文件有数百万行,有些只有几行。从每个文件中提取一些头/格式/日期数据以描述每个文件。这是在16 CPU机器上完成的,因此使用多进程是有意义的。
目前,预期结果被发送到列表(ahlala
),以便我可以打印出来;稍后,这将被写入*.csv文件。这是我的代码的简化版本,最初基于this非常有帮助的帖子。
import multiprocessing as mp
def dirwalker(directory):
ahlala = []
# X() reads files and grabs lines, calls helper function to calculate
# info, and returns stuff to the callback function
def X(f):
fileinfo = Z(arr_of_lines)
return fileinfo
# Y() reads other types of files and does the same thing
def Y(f):
fileinfo = Z(arr_of_lines)
return fileinfo
# results() is the callback function
def results(r):
ahlala.extend(r) # or .append, haven't yet decided
# helper function
def Z(arr):
return fileinfo # to X() or Y()!
for _,_,files in os.walk(directory):
pool = mp.Pool(mp.cpu_count()
for f in files:
if (filetype(f) == filetypeX):
pool.apply_async(X, args=(f,), callback=results)
elif (filetype(f) == filetypeY):
pool.apply_async(Y, args=(f,), callback=results)
pool.close(); pool.join()
return ahlala
请注意,如果我将所有的
Z()
辅助函数放入X()
、Y()
或results()
中,代码是可以运行的,但这样做是否会重复或者可能比可能更慢?我知道回调函数在每次函数调用时都会被调用,但回调函数是什么时候被调用的呢?它是在pool.apply_async()
完成进程的所有作业之后被调用的吗?如果这些辅助函数在第一个函数pool.apply_async()
的范围内(在本例中为X()
)被调用,那么速度不应该更快吗?如果不是,我应该把辅助函数放在results()
中吗?其他相关想法:守护进程是为什么没有显示任何东西?我也非常困惑如何排队,以及这是否是问题。 这似乎是学习它的起点, 但使用
apply_async
时是否可以安全地忽略排队,或者只有在时间效率上才能注意到它的影响?
arr_of_lines
是从哪里来的?你将f
传递给X
和Y
,但是没有使用它,而是使用了arr_of_lines
。你能澄清一下那里发生了什么吗? - danof
是一个 *.zip 文件,其中包含 5 个 *.csv 文件(这代表了几百个文件)。因此,我使用zipfile.ZipFile
查看文件内容(该方法返回文件名列表),使用for
循环来使用csv.reader
读取每个 csv 文件,并传出前两行/行(标题和示例行)-- 这是arr_of_lines
。 - ehacinom