Pandas迭代中每一行的并行处理

3
我有一个名为df_fruits的水果数据框。
index      name
1          apple
2          banana
3          strawberry

同时,它的市场价格以以下方式存储在MySQL数据库中:

category      market      price
apple         A           1.0
apple         B           1.5
banana        A           1.2
banana        A           3.0
apple         C           1.8
strawberry    B           2.7        
...

df_fruits 迭代期间,我希望进行一些处理。

下面的代码是非并行版本。

def process(fruit):
   # make DB connection
   # fetch the prices of fruit from database
   # do some processing with fetched data, which takes a long time
   # insert the result into DB
   # close DB connection

for idx, f in df_fruits.iterrows():
    process(f)
我想做的是对df_fruits中的每一行进行并行的process操作,因为df_fruits有很多行,市场价格表的大小也相当大(获取数据需要很长时间)。

正如您所看到的,行之间的执行顺序不重要,并且没有共享数据。

df_fruits的迭代中,我不确定在哪里定位`pool.map()`。我是否需要在并行执行之前将行拆分并将块分配给每个进程?(如果是这样,那么比其他进程先完成工作的进程将处于空闲状态?)

我已经研究了并行处理包“pandarallel”,但我不能使用它(我的操作系统是Windows)。

任何帮助将不胜感激。


数据有多大? - Yash
@Yash,我不能确切地说,但有时仅获取市场价格就需要几分钟以上。问题不仅在于数据的大小,还在于数据库结构不良(这会使获取时间更长)。 - cointreau
3个回答

3

完全不需要使用 pandas。你可以直接使用来自multiprocessing包的PoolPool.map()需要两个参数:一个函数和一个值列表。

因此,你可以这样做:

from multiprocessing import Pool

n = 5  # Any number of threads
with Pool(n) as p:
    p.map(process, df_fruits['name'].values)

这将逐个遍历 df_fruits 数据框中的所有水果。请注意,由于 process 函数旨在将结果写回数据库,因此此处没有返回结果。


如果您想要考虑每行中的多列,则可以将 df_fruits['name'].values 更改为:

df_fruits[cols].to_dict('records')

这将给preprocess一个字典作为输入,例如:

{'name': 'apple', 'index': 1, ...}

1
@W.Cointreau:不行,你不能这样做。但是,你可以使用df_fruits[cols].values(其中cols是你想要考虑的列),而不是使用df_fruits['name'].values。或者你可以直接使用df_fruits.values。这将给你一个列表形式的值。通常更容易使用字典,所以你可以尝试使用df_fruits.to_dict('records') - Shaido
1
由于process函数有更多的参数,我参考了https://dev59.com/j18e5IYBdhLWcg3wnrcQ。最后,我需要使用`partial`,例如`func = partial(process, param1, param2 ...),然后pool.map(func, df_fruits.to_dict('records')`。非常感谢! - cointreau
@Shaido 如果函数有两个参数,并且存在两列需要引用该函数,那么应该按照什么流程进行操作? - RF1991
@Shaido 你是指 p.starmap(process, df_fruits['index','name'].values) 吗? - RF1991
@Shaido 我非常感谢你的帮助,最诚挚的问候。 - RF1991
显示剩余3条评论

1

是的,虽然pandas库没有直接提供这个功能,但是它是可以实现的。

也许你可以尝试像这样做:

def do_parallel_stuff_on_dataframe(df, fn_to_execute, num_cores):
    # create a pool for multiprocessing
    pool = Pool(num_cores)

    # split your dataframe to execute on these pools
    splitted_df = np.array_split(df, num_cores)

    # execute in parallel:
    split_df_results = pool.map(fn_to_execute, splitted_df)

    #combine your results
    df = pd.concat(split_df_results)

    pool.close()
    pool.join()
    return df

1
您可能可以做类似这样的事情:

with Pool() as pool:
    # create an iterator that just gives you the fruit and not the idex
    rows = (f for _, f in df_fruits.iterrows())
    pool.imap(process, rows)

如果您不关心结果,或者愿意以任何顺序获取结果,或者不在乎结果,则可以使用其他池原语之一,而不是使用map。

啊,我从来没有听说过imap。我在这里找到了一些知识:https://dev59.com/5mgu5IYBdhLWcg3wZma8#11338089。谢谢。 - cointreau
实际上,我刚刚进行了一项实验,对于100,000个元素的快速操作,“map”似乎比“imap”更快。它们都允许迭代器作为参数。尝试实验并查看哪个最适合您。当然,我们不知道您是否需要处理结果。 - Frank Yellin
好的,我会尝试一下。我预计imapmap更快,因为在process中获取数据是主要的瓶颈,而每个作业都不会影响其他作业。当我使用map和几个工作进程时,似乎有些工作进程等待其他进程完成,尽管它们不需要等待... - cointreau

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接