无法理解aggregateByKey和combineByKey的工作原理

3

我是一个初学者,正在学习Apache Spark。目前我正在尝试使用Python学习各种聚合操作。

为了让你更好地理解我所面临的问题,我发现很难理解aggregateByKey函数的工作原理,以便按“状态”计算订单数量。

我正在跟随ITVersity的YouTube播放列表,下面是我正在使用的代码和一些样本输出。

ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders")
for i in ordersRDD.take(10): print(i)

输出:
1,2013年7月25日00:00:00.0,11599,已关闭
2,2013年7月25日00:00:00.0,256,待付款
3,2013年7月25日00:00:00.0,12111,已完成
4,2013年7月25日00:00:00.0,8827,已关闭
5,2013年7月25日00:00:00.0,11318,已完成
6,2013年7月25日00:00:00.0,7130,已完成
7,2013年7月25日00:00:00.0,4530,已完成
8,2013年7月25日00:00:00.0,2911,处理中
9,2013年7月25日00:00:00.0,5657,待付款
10,2013年7月25日00:00:00.0,5648,待付款

ordersMap = ordersRDD.map(lambda x: (x.split(",")[3], x))

输出结果:
('CLOSED','1,2013-07-25 00:00:00.0,11599,CLOSED')
('PENDING_PAYMENT','2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT')
('COMPLETE','3,2013-07-25 00:00:00.0,12111,COMPLETE')
('CLOSED','4,2013-07-25 00:00:00.0,8827,CLOSED')
('COMPLETE','5,2013-07-25 00:00:00.0,11318,COMPLETE')
('COMPLETE','6,2013-07-25 00:00:00.0,7130,COMPLETE')
('COMPLETE','7,2013-07-25 00:00:00.0,4530,COMPLETE')
('PROCESSING','8,2013-07-25 00:00:00.0,2911,PROCESSING')
('PENDING_PAYMENT','9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT')
('PENDING_PAYMENT','10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT')

ordersByStatus = ordersMap.aggregateByKey(0, lambda acc, val: acc + 1, lambda acc,val: acc + val)
for i in ordersByStatus.take(10): print(i)

最终输出:
(u'SUSPECTED_FRAUD',1558)
(u'CANCELED',1428)
(u'COMPLETE',22899)
(u'PENDING_PAYMENT',15030)
(u'PENDING',7610)
(u'CLOSED',7556)
(u'ON_HOLD',3798)
(u'PROCESSING',8275)
(u'PAYMENT_REVIEW',729)

我难以理解的问题是:
1.为什么aggregateByKey函数需要接受2个lambda函数作为参数?
2.可视化第一个lambda函数的作用?
3.可视化第二个lambda函数的作用?

如果可能的话,您能否用一些简单的块图解释aggregateByKey的工作原理?也许还有一些中间计算?非常感谢您的帮助!

谢谢,
Shiv


@JustinPihony 谢谢你指引我到这里。非常有用。 - Shiv Konar
1个回答

5

Spark RDD(弹性分布式数据集)被划分为多个分区,因此当您对所有数据执行聚合函数时,您将首先聚合每个分区内的数据(分区就是数据的一个子集)。然后,您需要告诉Spark如何聚合这些分区。

第一个λ函数告诉Spark在遇到新值时如何更改运行计数器(累加器)。由于您正在计数,因此只需将1添加到累加器中即可。在一个分片内,如果当前运行计数为4并添加了另一个值,则运行计数应为4 + 1 = 5。因此,您的第一个λ函数是:

lambda acc, val: acc + 1

第二个lambda函数告诉Spark如何将一个数据切片中的运行计数与另一个数据切片中的运行计数相结合。如果一个切片有5个计数,而第二个切片有7个计数,则组合计数为5 + 7 = 12。因此,您的第二个函数最好这样编写:
lambda acc1, acc2: acc1 + acc2

唯一需要注意的是,所有操作都是以“按键”为基础进行的。累加器(计数器)的值取决于键值。

谢谢您的回答!现在我完全明白了。我已经点赞您的回答,但我的声望还不足以公开显示。感谢您的帮助! - Shiv Konar

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接