如何让Pandas DataFrame的apply()函数使用所有CPU核心?

191
截至2017年8月,Pandas的DataFame.apply()仍然只能使用单个核心,这意味着当您运行df.apply(myfunc, axis=1)时,多核计算机将浪费大部分计算时间。
如何在并行中使用所有内核来运行数据帧上的应用程序?
12个回答

160

你可以使用swifter包:

pip install swifter

(请注意,您可能希望在虚拟环境中使用此功能,以避免与已安装的依赖项版本冲突。)
Swifter作为pandas的插件工作,允许您重复使用apply函数:
import swifter

def some_function(data):
    return data * 10

data['out'] = data['in'].swifter.apply(some_function)

它将自动找到最有效的并行化函数方式,无论它是否矢量化(如上例)。GitHub上提供更多示例性能比较。请注意,该软件包正在积极开发中,因此API可能会更改。
还要注意,对于字符串列,不会自动工作。使用字符串时,Swifter将回退到“简单”的Pandas apply,这不会并行化。在这种情况下,即使强制使用dask也不会创建性能提升,您最好手动拆分数据集并使用multiprocessing进行并行化

3
出于纯好奇,是否有一种方法可以在进行并行应用程序时限制它使用的核心数?我有一个共享服务器,所以如果我占用了全部32个核心,那么没有人会开心。 - Maksim Khaitovich
1
@MaximHaytovich 我不知道。Swifter在后台使用dask,所以也许它会尊重这些设置:https://dev59.com/ClkR5IYBdhLWcg3w7hfE#40633117 — 否则我建议在GitHub上开一个问题。作者非常乐于回应。 - slhck
2
+1 for Swifter。它不仅使用最佳方法并行化,还通过tqdm集成了进度条。 - scribu
6
对于字符串,只需像这样添加 allow_dask_on_strings(enable=True)df.swifter.allow_dask_on_strings(enable=True).apply(some_function)。来源:https://github.com/jmcarpenter2/swifter/issues/45 - learner
1
安装Swifter会导致pip降级多个重要软件包到旧版本,其中包括numpy和pandas。 - azureai
显示剩余14条评论

138

最简单的方法是使用Dask的map_partitions。你需要以下导入(你需要pip install dask):

import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import get

语法是

data = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)

def myfunc(x,y,z, ...): return <whatever>

res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get)  

我相信如果您有16个核心,30是适当的分区数量。为了完整起见,我在我的机器上计时了差异(16个核心):


data = pd.DataFrame()
data['col1'] = np.random.normal(size = 1500000)
data['col2'] = np.random.normal(size = 1500000)

ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y): return y*(x**2+1)
def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1)
def pandas_apply(): return apply_myfunc_to_DF(data)
def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get)  
def vectorized(): return myfunc(data['col1'], data['col2']  )

t_pds = timeit.Timer(lambda: pandas_apply())
print(t_pds.timeit(number=1))

28.16970546543598

t_dsk = timeit.Timer(lambda: dask_apply())
print(t_dsk.timeit(number=1))

2.708152851089835

t_vec = timeit.Timer(lambda: vectorized())
print(t_vec.timeit(number=1))

0.010668013244867325

dask apply 在处理分区时,相较于 pandas apply,速度提升了10倍。当然,如果您的函数可以向量化,那就应该这样做 - 在本例中,函数 (y*(x**2+1)) 可以被轻松向量化,但有很多东西无法向量化。


2
很高兴知道,谢谢您的发布。您能解释一下为什么选择30个分区吗?当更改此值时,性能是否会发生变化? - Andrew L
6
我假设每个分区都由一个独立的进程处理,并且使用16个核心,我认为可以同时运行16或32个进程。我进行了尝试,性能似乎在32个分区时得到改善,但进一步增加没有任何好处。我认为在四核机器上,您需要8个分区等等。请注意,我注意到在16和32之间确实有一些改善,因此我认为您确实需要2倍于NUM_PROCESSORS的数量。 - Roko Mijic
17
唯一的问题是,“get=”关键字已被弃用。请改用“scheduler=”关键字,并使用所需调度程序的名称,如“threads”或“processes”。 - wordsforthewise
7
对于dask v0.20.0及以上版本,请使用ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(scheduler='processes'),或者选择其他调度选项。当前代码会抛出"TypeError: The get= keyword has been removed. Please use the scheduler= keyword instead with the name of the desired scheduler like 'threads' or 'processes'"的错误提示。 - mork
2
在执行此操作之前,请确保数据框中没有重复的索引,否则会抛出“ValueError: cannot reindex from a duplicate axis”错误。为了解决这个问题,您可以通过df = df[~df.index.duplicated()]删除重复的索引,或者通过df.reset_index(inplace=True)重置索引。 - Habib Karbasian
显示剩余10条评论

48

你可以尝试使用 pandarallel 替代:一个简单有效的工具,可在所有CPU上并行运行您的pandas操作(适用于Linux和macOS)。

  • 并行化有成本(实例化新进程、通过共享内存发送数据等),因此仅当要并行计算的量足够大时,并行化才是有效的。对于非常少量的数据,使用并行化并不总是值得的。
  • 应用的函数不能是lambda函数。
from pandarallel import pandarallel
from math import sin

pandarallel.initialize()

# FORBIDDEN
df.parallel_apply(lambda x: sin(x**2), axis=1)

# ALLOWED
def func(x):
    return sin(x**2)

df.parallel_apply(func, axis=1)

请查看https://github.com/nalepae/pandarallel


1
你好,我遇到了一个问题解决不了。使用pandarallel时出现错误:AttributeError: Can't pickle local object 'prepare_worker.<locals>.closure.<locals>.wrapper'。你能帮我解决一下吗? - Alex Cam
@Alex 对不起,我不是那个模块的开发者。你的代码是什么样子的?你可以尝试将你的“内部函数”声明为全局变量吗?(只是猜测) - kkkobelief24
@AlexCam 你的函数应该在其他函数之外定义,这样Python才能将其pickle化以进行多进程处理。 - Kenan
2
@G_KOBELIEF 在Python >3.6中,我们可以使用lambda函数与pandaparallel。 - learner
我们可以指定核心数量吗? - Nathan B

47

如果您希望保持原生Python:

import multiprocessing as mp

with mp.Pool(mp.cpu_count()) as pool:
    df['newcol'] = pool.map(f, df['col'])

将函数f并行应用于数据框df的列col


按照这种方法,我从pandas/core/frame.py__setitem__中得到了一个ValueError: Length of values does not match length of index。不确定我是否做错了什么,或者将值分配给df['newcol']是否是线程安全的。 - Rattle
2
你可以将pool.map写入一个中间temp_result列表,以便检查长度是否与df匹配,然后执行df['newcol'] = temp_result。 - Olivier Cruchant
你的意思是创建新列吗?你会使用什么? - Olivier Cruchant
是的,将map的结果分配给数据框的新列。map不会返回发送到函数f的每个块的结果列表吗?那么当您将其分配给列“newcol”时会发生什么?使用Pandas和Python 3。 - Mina
它实际上运行非常顺畅!你试过了吗?它创建了一个与df相同长度的列表,与发送的顺序相同。它以并行方式执行c2 = f(c1)。在Python中没有更简单的多进程方法。就性能而言,Ray似乎也可以做出好的表现(https://towardsdatascience.com/10x-faster-parallel-python-without-python-multiprocessing-e5017c93cce1),但它不够成熟,在我的经验中安装并不总是顺利的。 - Olivier Cruchant
显示剩余2条评论

14

我只是想给关于Dask的更新答案。

import dask.dataframe as dd

def your_func(row):
  #do something
  return row

ddf = dd.from_pandas(df, npartitions=30) # find your own number of partitions
ddf_update = ddf.apply(your_func, axis=1).compute()

在没有使用Dask的情况下,对于我的10万条记录:

CPU时间:用户6分32秒,系统:100毫秒,总共:6分32秒 墙上时间:6分32秒

使用Dask后:

CPU时间:用户5.19秒,系统:784毫秒,总共:5.98秒 墙上时间:1分3秒


12

为了使用所有物理或逻辑核心,您可以尝试 mapply 作为 swifterpandarallel 的替代方案。

您可以在初始化时设置核心数量(以及分块行为):

import pandas as pd
import mapply

mapply.init(n_workers=-1)

...

df.mapply(myfunc, axis=1)

默认情况下(n_workers=-1),该软件包使用系统上所有可用的物理CPU。如果您的系统使用超线程(通常会显示出两倍于实际物理CPU数量的逻辑核心),mapply将会生成一个额外的工作者来优先处理多进程池,而不是系统上的其他进程。

根据您对“所有内核”的定义,您也可以使用所有逻辑内核(请注意,这样做会导致CPU束缚过程争夺物理CPU,可能会减慢操作速度):

import multiprocessing
n_workers = multiprocessing.cpu_count()

# or more explicit
import psutil
n_workers = psutil.cpu_count(logical=True)

1
易于设置! - Ali Ait-Bachir

4

这是一个sklearn基础转换器的示例,其中使用了pandas apply进行并行处理。

import multiprocessing as mp
from sklearn.base import TransformerMixin, BaseEstimator

class ParllelTransformer(BaseEstimator, TransformerMixin):
    def __init__(self,
                 n_jobs=1):
        """
        n_jobs - parallel jobs to run
        """
        self.variety = variety
        self.user_abbrevs = user_abbrevs
        self.n_jobs = n_jobs
    def fit(self, X, y=None):
        return self
    def transform(self, X, *_):
        X_copy = X.copy()
        cores = mp.cpu_count()
        partitions = 1

        if self.n_jobs <= -1:
            partitions = cores
        elif self.n_jobs <= 0:
            partitions = 1
        else:
            partitions = min(self.n_jobs, cores)

        if partitions == 1:
            # transform sequentially
            return X_copy.apply(self._transform_one)

        # splitting data into batches
        data_split = np.array_split(X_copy, partitions)

        pool = mp.Pool(cores)

        # Here reduce function - concationation of transformed batches
        data = pd.concat(
            pool.map(self._preprocess_part, data_split)
        )

        pool.close()
        pool.join()
        return data
    def _transform_part(self, df_part):
        return df_part.apply(self._transform_one)
    def _transform_one(self, line):
        # some kind of transformations here
        return line

更多信息请参见https://towardsdatascience.com/4-easy-steps-to-improve-your-machine-learning-code-performance-88a0b0eeffa8


1
self._preprocess_part 是什么?我只找到了 _transform_part - Phun

3

原生的Python解决方案(使用numpy),可应用于整个数据框,就像原始问题所要求的那样(不仅限于单个列)。

import numpy as np
import multiprocessing as mp

dfs = np.array_split(df, 8000) # divide the dataframe as desired

def f_app(df):
    return df.apply(myfunc, axis=1)

with mp.Pool(mp.cpu_count()) as pool:
    res = pd.concat(pool.map(f_app, dfs))

2

这是另一个使用Joblib和scikit-learn的辅助代码的例子。如果你已经安装了scikit-learn,它会很轻便。如果你更喜欢对它所做的事情有更多的控制权,那么它是很好的选择,因为joblib很容易被修改。

from joblib import parallel_backend, Parallel, delayed, effective_n_jobs
from sklearn.utils import gen_even_slices
from sklearn.utils.validation import _num_samples


def parallel_apply(df, func, n_jobs= -1, **kwargs):
    """ Pandas apply in parallel using joblib. 
    Uses sklearn.utils to partition input evenly.
    
    Args:
        df: Pandas DataFrame, Series, or any other object that supports slicing and apply.
        func: Callable to apply
        n_jobs: Desired number of workers. Default value -1 means use all available cores.
        **kwargs: Any additional parameters will be supplied to the apply function
        
    Returns:
        Same as for normal Pandas DataFrame.apply()
        
    """
    
    if effective_n_jobs(n_jobs) == 1:
        return df.apply(func, **kwargs)
    else:
        ret = Parallel(n_jobs=n_jobs)(
            delayed(type(df).apply)(df[s], func, **kwargs)
            for s in gen_even_slices(_num_samples(df), effective_n_jobs(n_jobs)))
        return pd.concat(ret)

用法:result = parallel_apply(my_dataframe, my_func)

1

而不是

df["new"] = df["old"].map(fun)

执行
from joblib import Parallel, delayed
df["new"] = Parallel(n_jobs=-1, verbose=10)(delayed(fun)(i) for i in df["old"])

对我来说,这比原来稍微好一点。
import multiprocessing as mp
with mp.Pool(mp.cpu_count()) as pool:
    df["new"] = pool.map(fun, df["old"])

当您获得进度指示和自动分批处理时,如果作业非常小,则会更加方便。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接