我是一个初学者,正在学习Apache Spark。目前我正在尝试使用Python学习各种聚合操作。
为了让你更好地理解我所面临的问题,我发现很难理解aggregateByKey函数的工作原理,以便按“状态”计算订单数量。
我正在跟随ITVersity的YouTube播放列表,下面是我正在使用的代码和一些样本输出。
ordersRDD = sc.textFile("/user/cloudera/sqoop_import/orders")
for i in ordersRDD.take(10): print(i)
输出:
1,2013年7月25日00:00:00.0,11599,已关闭
2,2013年7月25日00:00:00.0,256,待付款
3,2013年7月25日00:00:00.0,12111,已完成
4,2013年7月25日00:00:00.0,8827,已关闭
5,2013年7月25日00:00:00.0,11318,已完成
6,2013年7月25日00:00:00.0,7130,已完成
7,2013年7月25日00:00:00.0,4530,已完成
8,2013年7月25日00:00:00.0,2911,处理中
9,2013年7月25日00:00:00.0,5657,待付款
10,2013年7月25日00:00:00.0,5648,待付款
ordersMap = ordersRDD.map(lambda x: (x.split(",")[3], x))
输出结果:
('CLOSED','1,2013-07-25 00:00:00.0,11599,CLOSED')
('PENDING_PAYMENT','2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT')
('COMPLETE','3,2013-07-25 00:00:00.0,12111,COMPLETE')
('CLOSED','4,2013-07-25 00:00:00.0,8827,CLOSED')
('COMPLETE','5,2013-07-25 00:00:00.0,11318,COMPLETE')
('COMPLETE','6,2013-07-25 00:00:00.0,7130,COMPLETE')
('COMPLETE','7,2013-07-25 00:00:00.0,4530,COMPLETE')
('PROCESSING','8,2013-07-25 00:00:00.0,2911,PROCESSING')
('PENDING_PAYMENT','9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT')
('PENDING_PAYMENT','10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT')
ordersByStatus = ordersMap.aggregateByKey(0, lambda acc, val: acc + 1, lambda acc,val: acc + val)
for i in ordersByStatus.take(10): print(i)
最终输出:
(u'SUSPECTED_FRAUD',1558)
(u'CANCELED',1428)
(u'COMPLETE',22899)
(u'PENDING_PAYMENT',15030)
(u'PENDING',7610)
(u'CLOSED',7556)
(u'ON_HOLD',3798)
(u'PROCESSING',8275)
(u'PAYMENT_REVIEW',729)
我难以理解的问题是:
1.为什么aggregateByKey函数需要接受2个lambda函数作为参数?
2.可视化第一个lambda函数的作用?
3.可视化第二个lambda函数的作用?
如果可能的话,您能否用一些简单的块图解释aggregateByKey的工作原理?也许还有一些中间计算?非常感谢您的帮助!
谢谢,
Shiv