如何并行化Pandas数据框逐行应用(apply())方法

3
我有以下代码:
import pandas as pd
import time

def enrich_str(str):
        
    val1 = f'{str}_1'
    val2 = f'{str}_2'
    val3 = f'{str}_3'
    time.sleep(3)
    
    return val1, val2, val3
    
def enrich_row(passed_row):
    col_name = str(passed_row['colName'])
    my_string = str(passed_row[col_name])
    
    val1, val2, val3 = enrich_str(my_string)
    
    passed_row['enriched1'] = val1
    passed_row['enriched2'] = val2
    passed_row['enriched3'] = val3
    
    return passed_row


df = pd.DataFrame({'numbers': [1, 2, 3, 4, 5], 'colors': ['red', 'white', 'blue', 'orange', 'red']}, 
                  columns=['numbers', 'colors'])

df['colName'] = 'colors'

tic = time.perf_counter()
enriched_df = df.apply(enrich_row, col_name='colors', axis=1)
toc = time.perf_counter()

print(f"{df.shape[0]} rows enriched in {toc - tic:0.4f} seconds")

enriched_df

获取如下数据框的输出需要15秒钟:

enter image description here

现在我想使用多线程在我的机器上并行执行丰富操作。我尝试了很多解决方案,例如 Dasknumba,但似乎都不是很直接。

然后我偶然发现了 multiprocessing 库及其 pool.imaps() 方法。因此,我尝试运行以下代码:

import multiprocessing as mp

tic = time.perf_counter()
pool = mp.Pool(5)
result = pool.imap(enrich_row, df.itertuples(), chunksize=1)
pool.close()
pool.join()
toc = time.perf_counter()

print(f"{df.shape[0]} rows enriched in {toc - tic:0.4f} seconds")
result

大约需要2秒钟,result不是Pandas数据框架。我无法找出我的错误所在。

for i in range(1,4): df[f'enriched{i}'] = df.colors + f'_{i}' ?? 对于范围在1到4之间的i: df[f'enriched{i}'] = df.colors + f'_{i}' ?? - Nk03
@Nk03 我对并行化处理感兴趣,而不是简单的循环。 - lucazav
1
Dask 将会平稳运行。您可以遵循“map_partitions”的示例。话虽如此,您应该通常避免使用显式逐行循环,而是采用明显更快的列操作,就像上面建议的循环一样。 - Nick Becker
我之前尝试使用Dask dataframe的apply()方法处理用read_csv()读取的数据(120行)。但是这个过程没有进行并行化,而且我也不明白为什么。后来我发现这是因为dataframe的npartitions为1所导致的。通过对dataframe进行重新分区,使用repartition(npartitions=os.cpu_count()*2)方法解决了这个问题。 - lucazav
2个回答

1
我建议您使用pathos forkmultiprocessing,因为它会更好地处理数据框的pickling。 imap返回一个迭代器,而不是一个数据框,所以您需要将其转换回来:
def enrich_row(row_tuple):
    passed_row = row_tuple[1]
    col_name = str(passed_row['colName'])
    my_string = str(passed_row[col_name])
    
    val1, val2, val3 = enrich_str(my_string)
    
    passed_row['enriched1'] = val1
    passed_row['enriched2'] = val2
    passed_row['enriched3'] = val3
    
    return passed_row

df = pd.DataFrame({'numbers': [1, 2, 3, 4, 5], 'colors': ['red', 'white', 'blue', 'orange', 'red']}, 
                  columns=['numbers', 'colors'])

df['colName'] = 'colors'

from pathos.multiprocessing import Pool

tic = time.perf_counter()
result = Pool(8).imap(enrich_row, df.iterrows(), chunksize=1)
df = pd.DataFrame(result)
toc = time.perf_counter()

print(f"{df.shape[0]} rows enriched in {toc - tic:0.4f} seconds")
print(df)

注意,我正在使用df.iterrows(),它返回一个元组(row_number, row)的迭代器,因此我修改了enrich_row以处理这种格式。

谢谢你的快速回复,@albert。我预计并行化需要大约3-4秒才能完成所有工作。我运行了你的代码,但是1分钟后它仍在运行...pathos中有任何错误吗? - lucazav
我确认它会无限期运行。我被迫终止了该进程。 - lucazav
我不知道,这在我的机器上只需要3秒钟就能完成。你是在不同于帖子中的数据框上运行它吗? - Albert
不是的,我正在使用同一数据框架在Windows机器上通过VS Code运行它。 - lucazav

1

我接受了@albert的答案,因为它适用于Linux。无论如何,我发现Dask dataframe的apply()方法非常简单。正如我在之前的评论中提到的,一开始操作没有在包含120行数据的数据集上并行执行。后来我发现,这120行仅使用了Dask dataframe的一个分区。因此,只需要重新分区即可获得所需的并行性。这里是使用Dask的代码示例(其中引发了一些奇怪的警告...)。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接