如果有人想在他们的自定义并行 pandas-apply 代码上应用 tqdm,以下是一些帮助:
多年来,我尝试过一些并行化库,但我从未找到100%的并行化解决方案,主要是应用函数,我总是不得不回到我的"手动"代码。
df_multi_core - 你需要调用它。它接受:
- 您的 df 对象
- 您想要调用的函数名称
- 可以执行该函数的列的子集(有助于减少时间/内存)
- 要并行运行的作业数(-1或省略以使用所有内核)
- df 函数可接受的任何其他 kwargs (如"axis")
_df_split - 这是一个内部辅助函数,必须全局定位到运行模块(Pool.map 是"放置依赖项"的),否则我会将其定位为内部函数。
以下是来自我的 gist 的代码(我将在那里添加更多的 pandas 函数测试):
import pandas as pd
import numpy as np
import multiprocessing
from functools import partial
def _df_split(tup_arg, **kwargs):
split_ind, df_split, df_f_name = tup_arg
return (split_ind, getattr(df_split, df_f_name)(**kwargs))
def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
if njobs == -1:
njobs = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=njobs)
try:
splits = np.array_split(df[subset], njobs)
except ValueError:
splits = np.array_split(df, njobs)
pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
results = pool.map(partial(_df_split, **kwargs), pool_data)
pool.close()
pool.join()
results = sorted(results, key=lambda x:x[0])
results = pd.concat([split[1] for split in results])
return results
以下是使用tqdm "progress_apply"并行化apply的测试代码。
from time import time
from tqdm import tqdm
tqdm.pandas()
if __name__ == '__main__':
sep = '-' * 50
def apply_f(row):
return row['c1'] + 0.1
N = 1000000
np.random.seed(0)
df = pd.DataFrame({'c1': np.arange(N), 'c2': np.arange(N)})
print('testing pandas apply on {}\n{}'.format(df.shape, sep))
t1 = time()
res = df.progress_apply(apply_f, axis=1)
t2 = time()
print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))
t3 = time()
res = df_multi_core(df=df, df_f_name='progress_apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
t4 = time()
print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))
在输出结果中,你可以看到一条进度条表示单线程运行的进度,而使用并行化时每个核心都有自己的进度条。虽然有时会出现轻微的卡顿和其他核心同时出现的情况,但即使这样,也很有用,因为你能看到每个核心的进度统计信息(例如每秒处理条目数和总记录数)。
感谢 @abcdaa 提供这个很棒的库!