如何使用 Pandas 中的 apply 函数并行比较许多(模糊)字符串?

29
我有以下问题
我有一个数据框master,其中包含诸如以下句子:
master
Out[8]: 
                  original
0  this is a nice sentence
1      this is another one
2    stackoverflow is nice

对于Master中的每一行,我使用fuzzywuzzy在另一个数据帧中查找最佳匹配。这个数据帧被称为slave。我使用fuzzywuzzy是因为两个数据帧之间匹配的句子可能会有些不同(额外的字符等)。

例如,slave可以是:

slave
Out[10]: 
   my_value                      name
0         2               hello world
1         1           congratulations
2         2  this is a nice sentence 
3         3       this is another one
4         1     stackoverflow is nice

这里是一个完全功能的,极好的,紧凑的工作示例 :)

from fuzzywuzzy import fuzz
import pandas as pd
import numpy as np
import difflib


master= pd.DataFrame({'original':['this is a nice sentence',
'this is another one',
'stackoverflow is nice']})


slave= pd.DataFrame({'name':['hello world',
'congratulations',
'this is a nice sentence ',
'this is another one',
'stackoverflow is nice'],'my_value': [2,1,2,3,1]})

def fuzzy_score(str1, str2):
    return fuzz.token_set_ratio(str1, str2)

def helper(orig_string, slave_df):
    #use fuzzywuzzy to see how close original and name are
    slave_df['score'] = slave_df.name.apply(lambda x: fuzzy_score(x,orig_string))
    #return my_value corresponding to the highest score
    return slave_df.ix[slave_df.score.idxmax(),'my_value']

master['my_value'] = master.original.apply(lambda x: helper(x,slave))
这个问题的价值是一百万美元:我能并行化上面的应用程序代码吗? 毕竟,每一行在master中都要与slave中的所有行进行比较(slave数据集很小,我可以将许多副本保存到RAM中)。
我不知道为什么不能同时处理多个行的比较。
问题是:我不知道如何做到这一点,或者是否可能。
非常感谢任何帮助!

1
我注意到你在这里添加了dask标签。你已经尝试使用dask并遇到了问题吗? - MRocklin
1
Dask使用cloudpickle来序列化函数,因此可以轻松处理其他数据集上的lambda和闭包。 - MRocklin
1
大致相同,但我会使用“assign”而不是列赋值,并且我会为“apply”提供有关所期望的列的元数据。如果您创建一个最小可重现的示例,那么提供明确的解决方案就会更容易。例如,一些可以复制并粘贴到本地机器上工作的东西。 - MRocklin
1
让我们在聊天中继续这个讨论。点击此处进入聊天室 - MRocklin
你好@MRocklin,请查看我的更新答案。谢谢! - ℕʘʘḆḽḘ
显示剩余6条评论
3个回答

36

你可以使用Dask.dataframe并行化此过程。

>>> dmaster = dd.from_pandas(master, npartitions=4)
>>> dmaster['my_value'] = dmaster.original.apply(lambda x: helper(x, slave), name='my_value'))
>>> dmaster.compute()
                  original  my_value
0  this is a nice sentence         2
1      this is another one         3
2    stackoverflow is nice         1

此外,您应该考虑在使用线程与进程之间做出权衡。 您的模糊字符串匹配几乎肯定不会释放GIL,因此使用多个线程将不会带来任何好处。但是,使用进程将导致数据序列化并在计算机中移动,这可能会稍微降低速度。

您可以通过管理compute()方法的get=关键字参数,在使用线程和进程或分布式系统之间进行实验。

import dask.multiprocessing
import dask.threaded

>>> dmaster.compute(get=dask.threaded.get)  # this is default for dask.dataframe
>>> dmaster.compute(get=dask.multiprocessing.get)  # try processes instead

天才!只有一个快速的问题:我有一台8核心的Xeon机器,它能在上面工作吗?我不能像你建议的那样使用分布式系统。 - ℕʘʘḆḽḘ
3
多进程可以加速你的计算,但会因进程间数据传输而减慢速度。如果不了解你的问题比我想象的要多,那么我就无法确定是否能加速。 我建议尝试并进行性能分析。 - MRocklin
感谢@MRocklin!我相信许多人会发现这篇文章很有用。在浏览http://dask.pydata.org/en/latest/install.html后,我自己仍然完全不了解`dask`。 - ℕʘʘḆḽḘ
如果你还有20秒钟,可以跟进一下吗?我应该也调整npartitions吗? - ℕʘʘḆḽḘ
我有很多内存(128GB),所以我应该使用很多npartitions吗? - ℕʘʘḆḽḘ
很抱歉打扰您,不过您能不能给我一点提示?我看到现在有很多进程运行了相当长的时间,但是 CPU 使用率仍然非常低,每隔几秒钟就会有一些 2%-5% 的峰值。这是否是预期的情况?再次感谢您花费宝贵的时间。 - ℕʘʘḆḽḘ

4

我正在处理类似问题,并想为其他可能遇到此问题的人提供更完整的工作解决方案。@MRocklin不幸地在所提供的代码片段中有一些语法错误。我不是Dask专家,因此无法对某些性能考虑进行评论,但这应该能够像@MRocklin建议的那样完成您的任务。 这是使用Dask版本0.17.2Pandas版本0.22.0

import dask.dataframe as dd
import dask.multiprocessing
import dask.threaded
from fuzzywuzzy import fuzz
import pandas as pd

master= pd.DataFrame({'original':['this is a nice sentence',
'this is another one',
'stackoverflow is nice']})

slave= pd.DataFrame({'name':['hello world',
'congratulations',
'this is a nice sentence ',
'this is another one',
'stackoverflow is nice'],'my_value': [1,2,3,4,5]})

def fuzzy_score(str1, str2):
    return fuzz.token_set_ratio(str1, str2)

def helper(orig_string, slave_df):
    slave_df['score'] = slave_df.name.apply(lambda x: fuzzy_score(x,orig_string))
    #return my_value corresponding to the highest score
    return slave_df.loc[slave_df.score.idxmax(),'my_value']

dmaster = dd.from_pandas(master, npartitions=4)
dmaster['my_value'] = dmaster.original.apply(lambda x: helper(x, slave),meta=('x','f8'))

然后,获取你的结果(就像在这个解释器会话中一样):
In [6]: dmaster.compute(get=dask.multiprocessing.get)                                             
Out[6]:                                          
                  original  my_value             
0  this is a nice sentence         3             
1      this is another one         4             
2    stackoverflow is nice         5    

2

以下答案基于旧版API。一些新的代码:

dmaster = dd.from_pandas(master, npartitions=4)
dmaster['my_value'] = dmaster.original.apply(lambda x: helper(x, slave),meta=('x','f8'))
dmaster.compute(scheduler='processes') 

个人建议在辅助函数中放弃对模糊匹配得分的apply调用,直接在那里执行操作。

您可以使用这些提示来修改调度程序。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接