使用自定义聚合函数在Dask中进行GroupBy操作构建模式和相应计数函数

6
所以,dask现在已经更新,支持自定义聚合函数进行分组。 (感谢开发团队和@chmp的工作!)我目前正在尝试构建一种模式功能和相应的计数功能。基本上,我设想的是,模式返回一个列表,对于每个分组,它都会返回特定列的最常见值(即[4,1,2])。此外,还有一个相应的计数函数,返回这些值的实例数量,即3。
现在,我正在尝试在代码中实现这一点。根据groupby.py文件,自定义聚合的参数如下:
Parameters
    ----------
    name : str
        the name of the aggregation. It should be unique, since intermediate
        result will be identified by this name.
    chunk : callable
        a function that will be called with the grouped column of each
        partition. It can either return a single series or a tuple of series.
        The index has to be equal to the groups.
    agg : callable
        a function that will be called to aggregate the results of each chunk.
        Again the argument(s) will be grouped series. If ``chunk`` returned a
        tuple, ``agg`` will be called with all of them as individual positional
        arguments.
    finalize : callable
        an optional finalizer that will be called with the results from the
        aggregation.

这里提供的是计算平均值的代码:

    custom_mean = dd.Aggregation(
        'custom_mean',
        lambda s: (s.count(), s.sum()),
        lambda count, sum: (count.sum(), sum.sum()),
        lambda count, sum: sum / count,
    )
    df.groupby('g').agg(custom_mean)

我正在尝试思考最佳实现方式。目前我有以下函数:

def custom_count(x):
    count = Counter(x)
    freq_list = count.values()
    max_cnt = max(freq_list)
    total = freq_list.count(max_cnt)
    return count.most_common(total)

custom_mode = dd.Aggregation(
    'custom_mode',
    lambda s: custom_count(s),
    lambda s1: s1.extend(),
    lambda s2: ......
)

然而我在理解agg部分应该如何工作方面遇到了困难。对于这个问题的任何帮助都将不胜感激。

谢谢!

1个回答

4

诚然,目前文档的细节有些不足。感谢您引起了我的关注。如果这个答案有帮助,请告诉我,我会为dask贡献更新后的文档。

对于您的问题:对于单个返回值,聚合的不同步骤等效于:

res = chunk(df.groupby('g')['col'])
res = agg(res.groupby(level=[0]))
res = finalize(res)

在这些术语中,模式函数可以如下实现:
def chunk(s):
    # for the comments, assume only a single grouping column, the 
    # implementation can handle multiple group columns.
    #
    # s is a grouped series. value_counts creates a multi-series like 
    # (group, value): count
    return s.value_counts()


def agg(s):
    # s is a grouped multi-index series. In .apply the full sub-df will passed
    # multi-index and all. Group on the value level and sum the counts. The
    # result of the lambda function is a series. Therefore, the result of the 
    # apply is a multi-index series like (group, value): count
    return s.apply(lambda s: s.groupby(level=-1).sum())

    # faster version using pandas internals
    s = s._selected_obj
    return s.groupby(level=list(range(s.index.nlevels))).sum()


def finalize(s):
    # s is a multi-index series of the form (group, value): count. First
    # manually group on the group part of the index. The lambda will receive a
    # sub-series with multi index. Next, drop the group part from the index.
    # Finally, determine the index with the maximum value, i.e., the mode.
    level = list(range(s.index.nlevels - 1))
    return (
        s.groupby(level=level)
        .apply(lambda s: s.reset_index(level=level, drop=True).argmax())
    )

mode = dd.Aggregation('mode', chunk, agg, finalize)

注意,这个实现在出现并列情况时与数据框的.mode函数不匹配。这个版本会返回其中一个值而不是所有值。
现在可以使用众数聚合函数,如下所示:
import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame({
    'col': [0, 1, 1, 2, 3] * 10,
    'g0': [0, 0, 0, 1, 1] * 10,
    'g1': [0, 0, 0, 1, 1] * 10,
})
ddf = dd.from_pandas(df, npartitions=10)

res = ddf.groupby(['g0', 'g1']).agg({'col': mode}).compute()
print(res)

我遇到了一个Raise Exception异常(“无法处理非唯一多索引!”) - user48944
在哪个步骤?你能发布一个最小的示例(例如,作为gist)吗?你尝试手动运行代码,就像第一个代码块中那样吗? - user8570642
它在ValueError中失败:元数据推断在_agg_finalize中失败。Traceback是: Traceback(最近的调用最先): File“C:\ ProgramData \ Anaconda2 \ lib \ site-packages \ dask-0.15.2 + 14.g8d906032-py2.7.egg \ dask \ dataframe \ groupby.py”,第1130行,在agg中 返回self.aggregate(arg,split_every = split_every,split_out = split_out) ...... ValueError:元数据推断在_agg_finalize中失败。我会尝试获取一个MCV示例。 - user48944
有没有可能重新审视一下这个问题? - user48944
抱歉,现在有点忙。您可以绕过pandas并使用纯Python对象,通过返回列表作为中间结果。这样可以防止pandas解释结果。但是,代码会比较慢且存在一些粗糙的边缘。我为后者创建了一个问题(https://github.com/dask/dask/issues/2708),其中包含另一种实现方式。希望能有所帮助。 - user8570642
显示剩余5条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接