df.apply(myfunc, axis=1)
时,多核计算机将浪费大部分计算时间。如何在并行中使用所有内核来运行数据帧上的应用程序?
df.apply(myfunc, axis=1)
时,多核计算机将浪费大部分计算时间。你可以使用swifter
包:
pip install swifter
import swifter
def some_function(data):
return data * 10
data['out'] = data['in'].swifter.apply(some_function)
apply
,这不会并行化。在这种情况下,即使强制使用dask
也不会创建性能提升,您最好手动拆分数据集并使用multiprocessing
进行并行化。最简单的方法是使用Dask的map_partitions。你需要以下导入(你需要pip install dask
):
import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import get
语法是
data = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y,z, ...): return <whatever>
res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get)
我相信如果您有16个核心,30是适当的分区数量。为了完整起见,我在我的机器上计时了差异(16个核心):
data = pd.DataFrame()
data['col1'] = np.random.normal(size = 1500000)
data['col2'] = np.random.normal(size = 1500000)
ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y): return y*(x**2+1)
def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1)
def pandas_apply(): return apply_myfunc_to_DF(data)
def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get)
def vectorized(): return myfunc(data['col1'], data['col2'] )
t_pds = timeit.Timer(lambda: pandas_apply())
print(t_pds.timeit(number=1))
28.16970546543598
t_dsk = timeit.Timer(lambda: dask_apply())
print(t_dsk.timeit(number=1))
2.708152851089835
t_vec = timeit.Timer(lambda: vectorized())
print(t_vec.timeit(number=1))
0.010668013244867325
dask apply 在处理分区时,相较于 pandas apply,速度提升了10倍。当然,如果您的函数可以向量化,那就应该这样做 - 在本例中,函数 (y*(x**2+1)
) 可以被轻松向量化,但有很多东西无法向量化。
df = df[~df.index.duplicated()]
删除重复的索引,或者通过df.reset_index(inplace=True)
重置索引。 - Habib Karbasian你可以尝试使用 pandarallel
替代:一个简单有效的工具,可在所有CPU上并行运行您的pandas操作(适用于Linux和macOS)。
from pandarallel import pandarallel
from math import sin
pandarallel.initialize()
# FORBIDDEN
df.parallel_apply(lambda x: sin(x**2), axis=1)
# ALLOWED
def func(x):
return sin(x**2)
df.parallel_apply(func, axis=1)
如果您希望保持原生Python:
import multiprocessing as mp
with mp.Pool(mp.cpu_count()) as pool:
df['newcol'] = pool.map(f, df['col'])
将函数f
并行应用于数据框df
的列col
pandas/core/frame.py
的__setitem__
中得到了一个ValueError: Length of values does not match length of index
。不确定我是否做错了什么,或者将值分配给df['newcol']
是否是线程安全的。 - Rattle我只是想给关于Dask的更新答案。
import dask.dataframe as dd
def your_func(row):
#do something
return row
ddf = dd.from_pandas(df, npartitions=30) # find your own number of partitions
ddf_update = ddf.apply(your_func, axis=1).compute()
在没有使用Dask的情况下,对于我的10万条记录:
CPU时间:用户6分32秒,系统:100毫秒,总共:6分32秒 墙上时间:6分32秒
使用Dask后:
CPU时间:用户5.19秒,系统:784毫秒,总共:5.98秒 墙上时间:1分3秒
为了使用所有物理或逻辑核心,您可以尝试 mapply
作为 swifter
和 pandarallel
的替代方案。
您可以在初始化时设置核心数量(以及分块行为):
import pandas as pd
import mapply
mapply.init(n_workers=-1)
...
df.mapply(myfunc, axis=1)
默认情况下(n_workers=-1
),该软件包使用系统上所有可用的物理CPU。如果您的系统使用超线程(通常会显示出两倍于实际物理CPU数量的逻辑核心),mapply
将会生成一个额外的工作者来优先处理多进程池,而不是系统上的其他进程。
根据您对“所有内核”的定义,您也可以使用所有逻辑内核(请注意,这样做会导致CPU束缚过程争夺物理CPU,可能会减慢操作速度):
import multiprocessing
n_workers = multiprocessing.cpu_count()
# or more explicit
import psutil
n_workers = psutil.cpu_count(logical=True)
这是一个sklearn基础转换器的示例,其中使用了pandas apply进行并行处理。
import multiprocessing as mp
from sklearn.base import TransformerMixin, BaseEstimator
class ParllelTransformer(BaseEstimator, TransformerMixin):
def __init__(self,
n_jobs=1):
"""
n_jobs - parallel jobs to run
"""
self.variety = variety
self.user_abbrevs = user_abbrevs
self.n_jobs = n_jobs
def fit(self, X, y=None):
return self
def transform(self, X, *_):
X_copy = X.copy()
cores = mp.cpu_count()
partitions = 1
if self.n_jobs <= -1:
partitions = cores
elif self.n_jobs <= 0:
partitions = 1
else:
partitions = min(self.n_jobs, cores)
if partitions == 1:
# transform sequentially
return X_copy.apply(self._transform_one)
# splitting data into batches
data_split = np.array_split(X_copy, partitions)
pool = mp.Pool(cores)
# Here reduce function - concationation of transformed batches
data = pd.concat(
pool.map(self._preprocess_part, data_split)
)
pool.close()
pool.join()
return data
def _transform_part(self, df_part):
return df_part.apply(self._transform_one)
def _transform_one(self, line):
# some kind of transformations here
return line
self._preprocess_part
是什么?我只找到了 _transform_part
。 - Phun原生的Python解决方案(使用numpy),可应用于整个数据框,就像原始问题所要求的那样(不仅限于单个列)。
import numpy as np
import multiprocessing as mp
dfs = np.array_split(df, 8000) # divide the dataframe as desired
def f_app(df):
return df.apply(myfunc, axis=1)
with mp.Pool(mp.cpu_count()) as pool:
res = pd.concat(pool.map(f_app, dfs))
这是另一个使用Joblib和scikit-learn的辅助代码的例子。如果你已经安装了scikit-learn,它会很轻便。如果你更喜欢对它所做的事情有更多的控制权,那么它是很好的选择,因为joblib很容易被修改。
from joblib import parallel_backend, Parallel, delayed, effective_n_jobs
from sklearn.utils import gen_even_slices
from sklearn.utils.validation import _num_samples
def parallel_apply(df, func, n_jobs= -1, **kwargs):
""" Pandas apply in parallel using joblib.
Uses sklearn.utils to partition input evenly.
Args:
df: Pandas DataFrame, Series, or any other object that supports slicing and apply.
func: Callable to apply
n_jobs: Desired number of workers. Default value -1 means use all available cores.
**kwargs: Any additional parameters will be supplied to the apply function
Returns:
Same as for normal Pandas DataFrame.apply()
"""
if effective_n_jobs(n_jobs) == 1:
return df.apply(func, **kwargs)
else:
ret = Parallel(n_jobs=n_jobs)(
delayed(type(df).apply)(df[s], func, **kwargs)
for s in gen_even_slices(_num_samples(df), effective_n_jobs(n_jobs)))
return pd.concat(ret)
result = parallel_apply(my_dataframe, my_func)
而不是
df["new"] = df["old"].map(fun)
from joblib import Parallel, delayed
df["new"] = Parallel(n_jobs=-1, verbose=10)(delayed(fun)(i) for i in df["old"])
import multiprocessing as mp
with mp.Pool(mp.cpu_count()) as pool:
df["new"] = pool.map(fun, df["old"])
allow_dask_on_strings(enable=True)
:df.swifter.allow_dask_on_strings(enable=True).apply(some_function)
。来源:https://github.com/jmcarpenter2/swifter/issues/45 - learner