在并行处理下,高效地将函数应用于分组的pandas DataFrame

89

我经常需要对一个非常大的混合数据类型的DataFrame分组应用一个函数,并希望利用多个核心。

我可以从这些分组创建一个迭代器并使用multiprocessing模块,但这样效率不高,因为每个分组和函数的结果都必须进行pickle处理以在进程间传递信息。

是否有任何方法可以避免pickle,甚至完全避免复制DataFrame? 看起来multiprocessing模块的共享内存函数仅限于numpy数组。还有其他选项吗?


据我所知,没有办法共享任意对象。我在想,如果通过多进程获得的收益比 pickling 要花费更多的时间,那么你应该寻找一种可能性,为每个进程创建更大的工作包,以减少相对 pickling 的时间。另一个可能性是在创建组时使用 multiprocessing。 - Sebastian Werk
3
我使用UWSGI、Flask和预分叉(forking)来做类似的事情:我将pandas数据帧加载到一个进程中,将其分叉x次(使其成为一个共享内存对象),然后从另一个Python进程中调用这些进程,进行结果的连接(concat)。目前我使用JSON作为通讯过程,但这还是高度实验性的,现在有一个更好的选择:http://pandas.pydata.org/pandas-docs/dev/io.html#msgpack-experimental - Carst
顺便问一下,你有没有看过使用分块的HDF5?(HDF5不适用于并发写入,但你也可以将其保存到单独的文件中,最后再将它们连接起来) - Carst
7
这将针对0.14版本进行,参见此问题:https://github.com/pydata/pandas/issues/5751。 - Jeff
4
@Jeff被推到0.15 =( 的意思是@Jeff的某个事物或价值被降低到了0.15,可能指股票价格、比特币价格等。 - pyCthon
显示剩余2条评论
1个回答

12
从上面的评论来看,似乎计划在某个时间将此功能引入到pandas中(我也注意到了一个看起来很有趣的rosetta项目)。
但是,在每个并行功能都被纳入pandas之前,我注意到可以使用cython + OpenMP和C ++ 直接编写高效且非内存复制的并行增强功能到pandas中。
这是一个编写并行分组求和的简短示例,使用方法如下:
import pandas as pd
import para_group_demo

df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)})
print para_group_demo.sum(df.a, df.b)

输出结果为:

     sum
key     
0      6
1      11
2      4

注意:毫无疑问,这个简单的例子的功能最终将成为pandas的一部分。然而,有些事情在一段时间内更自然地并行化为C++,了解如何将其与pandas结合起来非常重要。


为了实现这一点,我编写了一个简单的单源文件扩展,其代码如下。
它从一些导入和类型定义开始。
from libc.stdint cimport int64_t, uint64_t
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map

cimport cython
from cython.operator cimport dereference as deref, preincrement as inc
from cython.parallel import prange

import pandas as pd

ctypedef unordered_map[int64_t, uint64_t] counts_t
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t
ctypedef vector[counts_t] counts_vec_t

C++中的unordered_map类型用于单线程求和,而vector用于多线程求和。

现在看看函数sum。它从类型化内存视图开始,以便快速访问:

def sum(crit, vals):
    cdef int64_t[:] crit_view = crit.values
    cdef int64_t[:] vals_view = vals.values

该函数将半等分给线程(这里硬编码为4),并让每个线程对其范围内的条目求和。
    cdef uint64_t num_threads = 4
    cdef uint64_t l = len(crit)
    cdef uint64_t s = l / num_threads + 1
    cdef uint64_t i, j, e
    cdef counts_vec_t counts
    counts = counts_vec_t(num_threads)
    counts.resize(num_threads)
    with cython.boundscheck(False):
        for i in prange(num_threads, nogil=True): 
            j = i * s
            e = j + s
            if e > l:
                e = l
            while j < e:
                counts[i][crit_view[j]] += vals_view[j]
                inc(j)

当线程完成时,该函数将所有结果(来自不同范围)合并为单个 unordered_map
    cdef counts_t total
    cdef counts_it_t it, e_it
    for i in range(num_threads):
        it = counts[i].begin()
        e_it = counts[i].end()
        while it != e_it:
            total[deref(it).first] += deref(it).second
            inc(it)        

剩下的就是创建一个 DataFrame 并返回结果:

    key, sum_ = [], []
    it = total.begin()
    e_it = total.end()
    while it != e_it:
        key.append(deref(it).first)
        sum_.append(deref(it).second)
        inc(it)

    df = pd.DataFrame({'key': key, 'sum': sum_})
    df.set_index('key', inplace=True)
    return df

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接