在MapReduce中计算中位数

18

能否有人解释一下在MapReduce中如何计算中位数/分位数?

我了解Datafu的中位数是这样的:'n'个mapper将数据排序并将数据发送到“1”个reducer,该reducer负责对来自'n'个mapper的所有数据进行排序并找到中位数(即中间值),我的理解正确吗?

如果是这样,那么这种方法是否可以用于海量数据的计算,因为我可以清楚地看到单个reducer在执行最终任务时会遇到困难。

谢谢

4个回答

16

尝试在一系列数值中查找中位数(中间数)需要将一个reducer传递整个数值范围,以确定哪个是“中间”值。

根据输入集合中范围和唯一值的不同,您可以引入组合器来输出每个值的频率 - 减少发送到单个reducer的map输出数量。然后,您的reducer可以消耗排序值/频率对以识别中位数。

如果您知道值的范围和大致分布,另一种扩展此方法的方式是使用自定义分区器,将键按范围桶(0-99传输到reducer 0,100-199传输到reducer 2,依此类推)进行分布。但这将需要某些辅助作业来检查reducer输出并执行最终的中位数计算(例如,了解每个reducer中键的数量,可以计算包含中位数的reducer输出以及偏移量)。


8

您真的需要准确的中位数和分位数吗?

很多时候,如果你只需要近似值并进行处理,尤其是在数据分区方面,你会更好。

事实上,你可以使用近似分位数加速找到准确分位数 (实际上在O(n / p)时间内),这里是策略的大致概述:

  1. 每个分区的映射器计算所需的分位数,并将它们输出到一个新的数据集中。该数据集应当比原始数据集小几个数量级(除非要求过多的分位数!)
  2. 在此数据集中,类似于“中位数”,再次计算分位数。这些是你的初始估计。
  3. 根据这些分位数(或者使用这种方法获得的其他分区)重新分割数据。目标是最终确保真正的分位数在一个分区中,并且每个分区最多只有一个所需的分位数
  4. 在每个分区中执行快速选择(O(n))以找到真正的分位数。

每个步骤都是线性时间。最昂贵的一步是第三步,因为它需要整个数据集进行重新分布,因此会生成O(n)网络流量。 您可能可以通过选择第一次迭代的“交替”分位数来优化该过程。例如,你想找到全局中位数。你很难在线性过程中找到它,但是当它分成k个分区时,你可能可以将它缩小到数据集的1/kth。所以,每个节点不仅报告自己的中位数,还应报告 (k-1)/(2k)和(k+1)/(2k)处的对象。这应该能够显着缩小真正中位数必须位于的值范围。因此,在下一步中,每个节点可以将那些在所需范围内的对象发送到单个主节点,并仅在此范围内选择中位数。


在这种方法中,找到精确的分位数可能非常昂贵,但这种方法可能比朴素方法更好。步骤1到4实际上有助于将集合分成两半,并在较小的空间中解决相同的问题。但是,在这种方法中,可能需要进行logn次步骤1到步骤4的迭代才能真正获得分位数。 - Sourabh

2
在许多实际场景中,数据集中的值的基数相对较小。在这种情况下,可以通过两个MapReduce作业有效地解决问题:
  1. 计算数据集中值的频率(基本上是Word Count作业)
  2. 身份映射器+一个基于对计算中位数的减速器
第一项工作将大大减少数据量,并且可以完全并行执行。第二项工作的减速器只需要处理n(n=您的值集的基数)个项目,而不是所有值,就像使用朴素方法一样。
以下是第二项工作的示例减速器。它是一个Python脚本,可直接在Hadoop流中使用。假设数据集中的值为int,但可以轻松适应double。
import sys

item_to_index_range = []
total_count = 0

# Store in memory a mapping of a value to the range of indexes it has in a sorted list of all values
for line in sys.stdin:
    item, count = line.strip().split("\t", 1)
    new_total_count = total_count + int(count)
    item_to_index_range.append((item, (total_count + 1,   new_total_count + 1)))
    total_count = new_total_count

# Calculate index(es) of middle items
middle_items_indexes = [(total_count / 2) + 1]
if total_count % 2 == 0:
    middle_items_indexes += [total_count / 2]

# Retrieve middle item(s) 
middle_items = []
for i in middle_items_indexes:
    for item, index_range in item_to_index_range:
        if i in range(*index_range):
            middle_items.append(item)
            continue

print sum(middle_items) / float(len(middle_items))

这份答案源自于Chris White回答。回答中提到可以使用combiner来计算值的频率。然而,在MapReduce中,并不能保证combiner会一直被执行。这会带来以下影响:
  • reducer 首先需要计算最终的 <value - frequency> 对,再计算中位数。
  • 在最坏的情况下,combiner永远不会被执行,reducer必须处理所有单独的值。

2

使用O((n log n)/p)的时间复杂度进行排序,然后使用O(1)的时间复杂度获取中位数。

是的...你可以使用O(n/p)的时间复杂度,但是你不能在Hadoop中直接使用排序功能。我建议只需对数据进行排序并获取中间项,除非你能证明编写并行kth最大算法的开发时间为2-20小时。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接