典型的例子:
我有一个DataFrame df
:
> df
para0 para1 para2
0 17.439020 True high
1 19.757758 True high
2 12.434424 True medium
3 14.789654 True low
4 14.131464 False high
5 9.900233 True high
6 10.977869 False low
7 8.004251 True medium
8 11.468420 False low
9 12.764453 False high
每一行都包含一个函数 foobar
的参数集合:
def foobar(r):
""" r is a row of df, does something, and it takes a long time"""
if r.para1:
x = r.para2
else:
x = 'low'
return int(r.para0), (r.Index+13)%3 == 0, x
我想将foobar
应用于df
的每一行,收集它们的结果,并将它们与它们对应的参数一起存储在一个DataFrame中。
我的(目前)解决方案:
df['count'] = 0
df['valid'] = False
df['outpt'] = ''
def wrapper(r, df):
c, v, o = foobar(r)
df.ix[r.Index,'count'] = c
df.ix[r.Index,'valid'] = v
df.ix[r.Index,'outpt'] = o
for r in df.itertuples():
wrapper(r, df)
这将产生:
> df
para0 para1 para2 count valid outpt
0 17.439020 True high 17.0 False high
1 19.757758 True high 19.0 False high
2 12.434424 True medium 12.0 True medium
3 14.789654 True low 14.0 False low
4 14.131464 False high 14.0 False low
5 9.900233 True high 9.0 True high
6 10.977869 False low 10.0 False low
7 8.004251 True medium 8.0 False medium
8 11.468420 False low 11.0 True low
9 12.764453 False high 12.0 False low
以下是我的问题:
在现实生活中,函数foobar
的计算成本很高,需要大约20-30分钟才能运行,而df
通常只有100-2000行。我可以访问一个具有八个核心的机器,由于foobar
仅依赖于当前处理的行而不依赖于其他任何内容,因此并行运行这些计算应该很容易。
当发生故障时(例如,如果有人意外关闭了机器),跳过已经处理过的行而不必从头开始进行所有操作将会更好。
我该如何做到这一点?
我尝试使用multiprocessing
,但不幸地失败了:
from multiprocessing import Pool
pool = Pool(3)
results = []
for r in df.itertuples():
results += [pool.apply_async(wrapper, r, df)]
随着:
> results[0].get()
…
/usr/lib/python3.5/multiprocessing/reduction.py in dumps(cls, obj, protocol)
48 def dumps(cls, obj, protocol=None):
49 buf = io.BytesIO()
---> 50 cls(buf, protocol).dump(obj)
51 return buf.getbuffer()
52
PicklingError: Can't pickle <class 'pandas.core.frame.Pandas'>: attribute lookup Pandas on pandas.core.frame failed
以下是我创建玩具数据帧的方法:
import pandas as pd
import numpy as np
df = pd.DataFrame({
'para0' : pd.Series(
np.random.gamma(12,size=10),
dtype=np.float),
'para1' : pd.Series(
[(True,False)[i] for i in np.random.randint(0,2,10)],
dtype=np.bool),
'para2' : pd.Categorical(
[('low','medium','high')[i] for i in np.random.randint(0,3,10)],
ordered=True),
})