如何在 Python 中并行处理 Pandas DataFrame 的行

5

典型的例子:

我有一个DataFrame df

> df

       para0  para1   para2
0  17.439020   True    high
1  19.757758   True    high
2  12.434424   True  medium
3  14.789654   True     low
4  14.131464  False    high
5   9.900233   True    high
6  10.977869  False     low
7   8.004251   True  medium
8  11.468420  False     low
9  12.764453  False    high

每一行都包含一个函数 foobar 的参数集合:

def foobar(r):
    """ r is a row of df, does something, and it takes a long time"""
    if r.para1:
        x = r.para2
    else:
        x = 'low'
    return int(r.para0), (r.Index+13)%3 == 0, x

我想将foobar应用于df的每一行,收集它们的结果,并将它们与它们对应的参数一起存储在一个DataFrame中。

我的(目前)解决方案:

df['count'] = 0
df['valid'] = False
df['outpt'] = ''

def wrapper(r, df):
    c, v, o = foobar(r)
    df.ix[r.Index,'count'] = c
    df.ix[r.Index,'valid'] = v
    df.ix[r.Index,'outpt'] = o

for r in df.itertuples():
    wrapper(r, df)

这将产生:
> df
       para0  para1   para2  count  valid   outpt
0  17.439020   True    high   17.0  False    high
1  19.757758   True    high   19.0  False    high
2  12.434424   True  medium   12.0   True  medium
3  14.789654   True     low   14.0  False     low
4  14.131464  False    high   14.0  False     low
5   9.900233   True    high    9.0   True    high
6  10.977869  False     low   10.0  False     low
7   8.004251   True  medium    8.0  False  medium
8  11.468420  False     low   11.0   True     low
9  12.764453  False    high   12.0  False     low

以下是我的问题:

在现实生活中,函数foobar的计算成本很高,需要大约20-30分钟才能运行,而df通常只有100-2000行。我可以访问一个具有八个核心的机器,由于foobar仅依赖于当前处理的行而不依赖于其他任何内容,因此并行运行这些计算应该很容易。

当发生故障时(例如,如果有人意外关闭了机器),跳过已经处理过的行而不必从头开始进行所有操作将会更好。

我该如何做到这一点?


我尝试使用multiprocessing,但不幸地失败了:

from multiprocessing import Pool

pool = Pool(3)
results = []

for r in df.itertuples():
    results += [pool.apply_async(wrapper, r, df)]

随着:

> results[0].get()
…
/usr/lib/python3.5/multiprocessing/reduction.py in dumps(cls, obj, protocol)
     48     def dumps(cls, obj, protocol=None):
     49         buf = io.BytesIO()
---> 50         cls(buf, protocol).dump(obj)
     51         return buf.getbuffer()
     52

PicklingError: Can't pickle <class 'pandas.core.frame.Pandas'>: attribute lookup Pandas on pandas.core.frame failed

以下是我创建玩具数据帧的方法:

import pandas as pd
import numpy as np

df = pd.DataFrame({
    'para0' : pd.Series(
        np.random.gamma(12,size=10),
        dtype=np.float),
    'para1' : pd.Series(
        [(True,False)[i] for i in np.random.randint(0,2,10)],
        dtype=np.bool),
    'para2' : pd.Categorical(
        [('low','medium','high')[i] for i in np.random.randint(0,3,10)],
        ordered=True),
    })
3个回答

2

我不知道这是否有帮助,但是尝试使用list而不是itertuples

我的意思是像这样:

df_list = [[x[0], x[1],x[2]] for x in df.itertuples()]
for r in df_list:
    results += [pool.apply_async(wrapper, r, df)]

1

错误直接指出pickle无法序列化带有pandas.core type.frame.Pandas的对象,这些对象是df.itertuples()对象

>>>type(next(df.itertuples()))
>>>pandas.core.frame.Pandas

解决方案 №1

不要使用标准的multiprocessing模块(它在底层使用pickle),而是可以使用pathos.multiprocessing模块。

# pip install pathos
from pathos.multiprocessing import Pool

with Pool() as p:
    result = []
    for r in df.itertuples():
        results += [p.apply_async(wrapper, args=(r, df))

好的。但是让我们看看结果。

results[0]

      para0    para1    para2   count   valid   outpt
0   9.451356    False   medium  9.0     False   low
1   10.818135   False   low     NaN      NaN    NaN
2   13.438129   True    low     NaN      NaN    NaN
3   11.517698   False   medium  NaN      NaN    NaN
4   7.415294    False   low     NaN      NaN    NaN
5   14.500403   False   low     NaN      NaN    NaN
6   16.283561   True    medium  NaN      NaN    NaN
7   10.402704   True    medium  NaN      NaN    NaN
8   8.890628    True    medium  NaN      NaN    NaN
9   8.103542    False   medium  NaN      NaN    NaN
... ... ... ... ... ... ...

results[1]
       para0    para1   para2   count   valid   outpt
0   10.652145   False   medium   NaN     NaN    NaN
1   13.026500   False   high     13.0    False  low
2   8.030650    True    low      NaN     NaN    NaN
3   13.638145   False   low      NaN     NaN    NaN
4   12.118411   True    low      NaN     NaN    NaN
... ... ... ... ... ... ...

所有这些都是因为在wrapper函数中每次更改df数据帧。由于每个工作进程都收到了原始的df并对其进行更改。你不能这样做,这是一种非常糟糕的做法,可能会导致不明显的问题和错误。

解决方案 №2

那么干脆把wrapper函数删除掉怎么样?你只想要在数据帧中收集执行foobar函数的结果,并将它们附加到原始数据帧df中。让我们这样做。但我们仍然会使用pathos.multiprocessing中的Pool类。

def foobar(r):
    """ r is a row of df, does something, and it takes a long time"""
    if r.para1:
        x = r.para2
    else:
        x = 'low'
    return {'count':int(r.para0), 'valid': (r.Index+13)%3 == 0, 'outpt':x}

#create tasks
tasks= [r for r in df.itertuples()]
with Pool() as p:
    results = pd.DataFrame(p.map(foobar, tasks))

results
    count   valid   outpt
0   10     False    low
1   13     False    low
2   8       True    low
3   13     False    low
4   12     False    low
... ... ... ...
995 15      True    low
996 12     False    low
997 5      False    high
998 13      True    low
999 13     False    medium

然后:

result_df = pd.concat([df, results], axis=1)

解决方案 №3

太好了,它起作用了。但是我们能否通过使用标准的multiprocessing库来避免pickle的问题呢?让我再次提醒您,问题在于从df.itertuples()序列化对象,实际上,我们正在尝试将它们提供给Pool类的某个方法(apply_async, map等)。那么让我们在wrapper函数内部完成它?我们只需要添加一行:


def parallel_wrapper(df):
    for r in df.itertuples():
        c, v, o = foobar(r)
        df.at[r.Index,'count'] = c
        df.at[r.Index,'valid'] = v
        df.at[r.Index,'outpt'] = o
    return df


现在要并行执行此操作,我们只需要将原始数据框分成几个部分,并使用标准的multiprocessing库中的方法(例如 Pool().map )进行处理。可以使用 np.array_split(df, chunks) 沿着0轴轻松地将数据框分成若干部分。
from multiprocessing import Pool

with Pool() as p:
    results = p.map(parallel_wrapper, np.array_split(df, 100))

然后:

result_df = pd.concat(results, ignore_index=True) 

P.S

最后,作为锦上添花,我想展示一下parallelbar库,它可以让你在执行Pool类的mapimapimap_unordered方法时可视化进度。例如,解决方案3将如下所示:
# pip install parallelbar
from parallelbar import progress_map

progress_map(parallel_wrapper, np,array_split(df, 100), core_progress= True) 

# for 4 cores


1
如果您想将行保留为字典,可以使用to_dict。这是一个工作示例(使用starmap,因为正在传递其他参数到函数):
from multiprocessing import Pool
import pandas as pd
from itertools import repeat

def test(df_row, otherparam):
    print(df_row, otherparam)
    return True

if __name__ == '__main__':
    df = pd.DataFrame({'a': [0, 1, 2], 'b':[1, 2, 3], 'c':[10, 20, 30]})
    df.set_index('a', inplace=True)
    pool = Pool(processes=2)
    it = df.reset_index().to_dict(orient='records')
    results = pool.starmap(test, zip(it, repeat(3)))
    print(results)

输出:

{'a': 0, 'b': 1, 'c': 10} 3
{'a': 1, 'b': 2, 'c': 20} 3
{'a': 2, 'b': 3, 'c': 30} 3
[True, True, True]

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接