能否有人解释一下在MapReduce中如何计算中位数/分位数?
我了解Datafu的中位数是这样的:'n'个mapper将数据排序并将数据发送到“1”个reducer,该reducer负责对来自'n'个mapper的所有数据进行排序并找到中位数(即中间值),我的理解正确吗?
如果是这样,那么这种方法是否可以用于海量数据的计算,因为我可以清楚地看到单个reducer在执行最终任务时会遇到困难。
谢谢
能否有人解释一下在MapReduce中如何计算中位数/分位数?
我了解Datafu的中位数是这样的:'n'个mapper将数据排序并将数据发送到“1”个reducer,该reducer负责对来自'n'个mapper的所有数据进行排序并找到中位数(即中间值),我的理解正确吗?
如果是这样,那么这种方法是否可以用于海量数据的计算,因为我可以清楚地看到单个reducer在执行最终任务时会遇到困难。
谢谢
尝试在一系列数值中查找中位数(中间数)需要将一个reducer传递整个数值范围,以确定哪个是“中间”值。
根据输入集合中范围和唯一值的不同,您可以引入组合器来输出每个值的频率 - 减少发送到单个reducer的map输出数量。然后,您的reducer可以消耗排序值/频率对以识别中位数。
如果您知道值的范围和大致分布,另一种扩展此方法的方式是使用自定义分区器,将键按范围桶(0-99传输到reducer 0,100-199传输到reducer 2,依此类推)进行分布。但这将需要某些辅助作业来检查reducer输出并执行最终的中位数计算(例如,了解每个reducer中键的数量,可以计算包含中位数的reducer输出以及偏移量)。
您真的需要准确的中位数和分位数吗?
很多时候,如果你只需要近似值并进行处理,尤其是在数据分区方面,你会更好。
事实上,你可以使用近似分位数加速找到准确分位数 (实际上在O(n / p)
时间内),这里是策略的大致概述:
O(n)
)以找到真正的分位数。每个步骤都是线性时间。最昂贵的一步是第三步,因为它需要整个数据集进行重新分布,因此会生成O(n)
网络流量。
您可能可以通过选择第一次迭代的“交替”分位数来优化该过程。例如,你想找到全局中位数。你很难在线性过程中找到它,但是当它分成k个分区时,你可能可以将它缩小到数据集的1/kth。所以,每个节点不仅报告自己的中位数,还应报告 (k-1)/(2k)和(k+1)/(2k)处的对象。这应该能够显着缩小真正中位数必须位于的值范围。因此,在下一步中,每个节点可以将那些在所需范围内的对象发送到单个主节点,并仅在此范围内选择中位数。
import sys
item_to_index_range = []
total_count = 0
# Store in memory a mapping of a value to the range of indexes it has in a sorted list of all values
for line in sys.stdin:
item, count = line.strip().split("\t", 1)
new_total_count = total_count + int(count)
item_to_index_range.append((item, (total_count + 1, new_total_count + 1)))
total_count = new_total_count
# Calculate index(es) of middle items
middle_items_indexes = [(total_count / 2) + 1]
if total_count % 2 == 0:
middle_items_indexes += [total_count / 2]
# Retrieve middle item(s)
middle_items = []
for i in middle_items_indexes:
for item, index_range in item_to_index_range:
if i in range(*index_range):
middle_items.append(item)
continue
print sum(middle_items) / float(len(middle_items))
使用O((n log n)/p)的时间复杂度进行排序,然后使用O(1)的时间复杂度获取中位数。
是的...你可以使用O(n/p)的时间复杂度,但是你不能在Hadoop中直接使用排序功能。我建议只需对数据进行排序并获取中间项,除非你能证明编写并行kth最大算法的开发时间为2-20小时。