Python:ProcessPoolExecutor vs ThreadPoolExecutor

4

我有以下的函数,它会随机打乱数据框中一列的值,并对包括被随机打乱列的整个数据框使用RandomForestClassifier来获取准确率分数。

我想同时对数据框的每一列运行这个函数,因为数据框非常大,包含500k行和1k列。关键是每次只随机打乱一列。

然而,我很困惑为什么ProcessPoolExecutorThreadPoolExecutor慢得多。我原以为ThreadPoolExecutor只适用于I/O任务更快。在这种情况下,它不涉及读取或写入任何文件。

或者我做错了什么?有没有更高效或更好的方法来优化这段代码,以实现并发执行和提高速度?

def randomShuffle(colname, X, y, fit):
    out = {'col_name': colname}
    X_= X.copy(deep = True)
    np.random.shuffle(X_[colname].values) # permutation of a single column
    pred = fit.predict(X_)
    out['scr'] = accuracy_score(y, pred)
    return out

def runConcurrent(classifier, X,y):
    skf = KFold(n_splits=5, shuffle = False)
    acc_scr0, acc_scr1 = pd.Series(), pd.DataFrame(columns = X.columns)
    # split data to training and validation
    for i, (train_idx, val_idx) in enumerate(skf.split(X,y)):
        X_train, y_train = X.iloc[train_idx,:], y.iloc[train_idx]
        X_val, y_val = X.iloc[val_idx,:], y.iloc[val_idx]
        
        fit = classifier.fit(X=X_train, y=y_train)
        # accuracy score
        pred = fit.predict(X_val)
        acc_scr0.loc[i] = accuracy_score(y_val, pred)
        
        # with concurrent.futures.ProcessPoolExecutor() as executor:
        with concurrent.futures.ThreadPoolExecutor() as executor:
            results = [executor.submit(randomShuffle, colname = j, X= X_val, y= y_val, fit = fit, labels = classifier.classes_) for j in X.columns]
            for res in concurrent.futures.as_completed(results):
                acc_scr1.loc[i, res.result()['col_name']] = res.result()['acc_scr']
    return None

1
这在之前已经被问过一两次。 - sglmr
我不认为我的函数在线程之间共享任何对象。而且,我的函数是否进行了任何I/O操作? - user1769197
如果要pickle的数据量相对于计算量较大,进程可能比线程慢。 - Nick ODell
2个回答

2

在没有测试的情况下很难看清楚,因为多进程的速度取决于许多因素。首先是通信开销,如果需要传输大量数据,速度会变慢,而且创建的任务数量也很重要。

创建一个任务会带来相当大的开销,必须将其与调用方法所花费的时间相对比。如果一个方法只需要几分之一秒就能完成,并且您调用它数千次,那么创建任务的开销就是显著的。另一方面,如果该函数需要多秒钟才能返回,那么开销就可以忽略不计。

我无法确定 randomShuffle 的速度有多快,但您可以尝试使用 map 函数和设置 chunksize 来查看是否加快了任何操作。

from functools import partial

...

with ProcessPoolExecutor() as executor:
    chunksize = len(points) // (executor._max_workers * 4)
    randomShuffleWrapper = partial(randomShuffle, X=X_val, y=y_val, fit=fit, labels=classifier.classes_)
    results = list(executor.map(randomShuffleWrapper, X.columns, chunksize=chunksize))

所有调用 randomShuffle 的唯一不同之处就是 colname。因此,创建一个部分函数来设置所有其他参数,你的新函数只需要将 colname 作为第一个参数。现在我们还需要设置一个合适的 chunksize。

这是一个超参数,实际上没有通用的好值,您可能需要尝试不同的值以找到最佳值。它会创建可迭代对象的块,并包装您的函数,以便一个任务计算块中所有条目的输出。

因此,如果您有1000个条目和chunksize为100,则只会创建10个任务,每个任务计算100个条目。这将大大减少创建和完成任务的开销。

作为起点,如果未给出 chunksize,则我使用 multiprocessing.pool.PoolProcessPoolExecutor.map() 将 chunksize 设置为默认值 1,基本上与您已经在做的事情相同,为每个元素创建一个任务。

我不知道你传递给函数的所有东西有多大。具体来说,X=X_val, y=y_val, fit=fit, labels=classifier.classes_。如果它们很大,那么通信开销会很大,因为所有内容都将被序列化发送和反序列化。所以还要检查它们是否很大,以及是否必须如此。通常情况下,您只想发送绝对必要的内容,并且函数的返回值也应尽可能小。
这就是为什么你建议使用chunksize来分割数据的原因。我的理解正确吗?
另外一个问题:假设我将列名分成4个块,这意味着会为这4个块创建4个进程吗?每个块的数据是如何处理的?是使用for循环还是多进程/多线程?
也许我可以更详细地解释一下chunksize的作用,因为它实际上非常简单,可以直接在代码中看到。我将引用Anaconda Python 3.9中的代码 python3.9/concurrent/futures/process.py

它对于ProcessPoolExecutor类有以下代码行。

class ProcessPoolExecutor(_base.Executor):
    ...
    def map(self, fn, *iterables, timeout=None, chunksize=1):
        ...
        results = super().map(partial(_process_chunk, fn), _get_chunks(*iterables, chunksize=chunksize), timeout=timeout)

_get_chunks将可迭代对象平均分成大小为chunksize的部分,如果可迭代对象的长度不可被chunksize整除,则可能会有一个较小的部分。

partial(_process_chunk, fn)创建了一个_process_chunk的偏函数,其形式如下:

def _process_chunk(fn, chunk):
    return [fn(*args) for args in chunk]

所以它只是在每个块中迭代每个元素并调用一个函数,对于你的情况是randomShuffle。这意味着一个任务不仅仅是调用一次randomShuffle,而是调用chunksize次。所有结果都被收集到一个列表中,然后进行合并。

super().map() 调用表示使用父类 Executor 的 map 函数:

class Executor(object)
    ...
    def map(self, fn, *iterables, timeout=None, chunksize=1):
        ...
        fs = [self.submit(fn, *args) for args in zip(*iterables)]
        ...

正如您所看到的,在这一点上,也只有submit函数被调用以针对所有iterables。在这一点上,fn是之前创建的部分函数partial(_process_chunk, fn),而iterables_get_chunks(*iterables, chunksize=chunksize)返回的内容(原始iterables的等大小的块)。因此,ProcessPoolExecutor的所有映射函数做的就是包装您的函数,并将您的iterables划分成块,然后调用submit

所有这些都是为了减少任务数量(submit调用),让任务做更多事情,即对某些iterables的每个元素调用给定的函数。

那么任务实际上是如何映射到进程的呢?通过创建一个ProcessPoolExecutor,您创建了一个进程池。数量由系统上的核心数定义,或者可以通过max_workers参数进行定义。

当调用“submit”时,会为其分配一个工作进程,因此工作进程接收运行函数所需的所有数据,并将函数的输出返回给主进程。这种数据传输通过序列化和反序列化数据完成,通常使用“pickle”模块。这也是许多开销产生的地方,因为进程间数据传输速度较慢。
因此,如果你使用“max_workers=10”创建了“ProcessPoolExecutor”,理论上可以同时执行10个任务(如果你当然有10个核心)。池和任务的抽象使你无需担心任务在何处运行。你只需要提交所有需要完成的任务,让“ProcessPoolExecutor”找出如何最好地将任务分配给进程。

1
这通常不是一个好事情,尤其当你的可迭代对象中有很多条目时。相关因素可能更多地与每个任务的工作量有关,而不是任务的数量:如果每个任务需要几分钟,你可以单独安排它们,IPC 不会起到很大的作用。然而,如果每个任务只需要几分之一秒的时间,IPC 开销就会开始显现,并可能压倒实际运行时间。 - Masklinn
谢谢你的回答。切换回ProcessPoolExecutor的原因是因为我的函数实际上并没有处理任何I/O任务,但是生成许多进程(每列一个进程,我有1k列)的开销导致整个过程变慢。这就是你建议使用chunksize来分割任务的原因吗?我的理解正确吗? - user1769197
@user1769197 我在我的回答中尝试回答了你的问题,请查看更新内容。如果你认为我的回答解决了你的问题,请也接受它。 - Nopileos
非常感谢您的更新。是的,基本上就是尝试创建批次。在每个批次中,使用了一个for循环。 - user1769197
最后一个问题:如果我将executor.map更改为executor.submit,会导致性能降低吗? - user1769197
显示剩余11条评论

-3
ProcessPoolExecutor和ThreadPoolExecutor之间的性能差异可以归因于Python中的全局解释器锁(GIL)。GIL只允许一个线程同时执行Python字节码,即使在多核系统上也是如此。这意味着在像训练机器学习模型这样的CPU密集型任务中,使用ProcessPoolExecutor的多个进程实际上会由于进程间通信的开销而减慢执行速度。
另一方面,ThreadPoolExecutor可以更高效,因为它在单个进程内利用多个线程,从而更好地利用CPU资源。特别是在涉及I/O操作或阻塞调用的任务时,这一点尤为明显。
在您的情况下,由于您正在使用scikit-learn的RandomForestClassifier来训练模型,这是一个CPU密集型任务,因此使用ThreadPoolExecutor是一个更好的选择。ProcessPoolExecutor中进程间通信的开销可能会抵消并行执行的好处。
为了进一步优化代码,您可以考虑以下建议:
  1. 在KFold中使用较少的折叠次数以减少迭代次数。
  2. 不要为每一列创建一个新的fit对象,可以在循环外部创建一次,并将其作为参数传递给randomShuffle函数。
  3. 考虑使用更高效的算法或模型来更有效地处理大型数据集,例如梯度提升算法。

2
我怀疑这个回答是ChatGPT(也就是说,它看起来有点像,而且你至少发布了另一个令人信服的ChatGPT回答https://dev59.com/p1QK5IYBdhLWcg3wSeK9#76674730/)。 - DavidW
我根据自己所知写下了这些话。作为一个新手,我也不知道任何形式的AI帮助都是不被接受的。我根据谷歌和其他网站的内容自己写下来,然后把答案给ChatGPT以便它能纠正和整理我的拙劣表达和排列。抱歉,似乎有点胡扯。我也没有意识到如果有人不喜欢解释,他们会投反对票并批评事情。我发了3条评论,人们非常苛刻。对于污染环境我感到抱歉。非常抱歉。由于被投反对票,我无法再发表评论。我感到很难过,而且感觉真的很愚蠢。 - Abdullah Shafi

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接