以下是一个简单的例子:
import pyspark
from pyspark.sql import window
import pyspark.sql.functions as sf
sc = pyspark.SparkContext(appName="test")
sqlcontext = pyspark.SQLContext(sc)
data = sqlcontext.createDataFrame([("Bob", "M", "Boston", 1, 20),
("Cam", "F", "Cambridge", 1, 25),
("Lin", "F", "Cambridge", 1, 25),
("Cat", "M", "Boston", 1, 20),
("Sara", "F", "Cambridge", 1, 15),
("Jeff", "M", "Cambridge", 1, 25),
("Bean", "M", "Cambridge", 1, 26),
("Dave", "M", "Cambridge", 1, 21),],
["name", 'gender', "city", 'donation', "age"])
data.show()
输出结果
+----+------+---------+--------+---+
|name|gender| city|donation|age|
+----+------+---------+--------+---+
| Bob| M| Boston| 1| 20|
| Cam| F|Cambridge| 1| 25|
| Lin| F|Cambridge| 1| 25|
| Cat| M| Boston| 1| 20|
|Sara| F|Cambridge| 1| 15|
|Jeff| M|Cambridge| 1| 25|
|Bean| M|Cambridge| 1| 26|
|Dave| M|Cambridge| 1| 21|
+----+------+---------+--------+---+
定义一个窗口
win_spec = (window.Window
.partitionBy(['gender', 'city'])
.rowsBetween(window.Window.unboundedPreceding, 0))
# window.Window.unboundedPreceding -- 组的第一行
# .rowsBetween(..., 0) --
0
表示当前行,如果指定为
-2
则表示当前行前两行
现在有一个陷阱:
temp = data.withColumn('cumsum',sum(data.donation).over(win_spec))
出现错误:
TypeErrorTraceback (most recent call last)
<ipython-input-9-b467d24b05cd> in <module>()
----> 1 temp = data.withColumn('cumsum',sum(data.donation).over(win_spec))
/Users/mupadhye/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/column.pyc in __iter__(self)
238
239 def __iter__(self):
--> 240 raise TypeError("Column is not iterable")
241
242
TypeError: Column is not iterable
这是因为使用了Python的sum
函数而不是pyspark
的。修复方法是使用pyspark.sql.functions.sum
中的sum
函数:
temp = data.withColumn('AgeSum',sf.sum(data.donation).over(win_spec))
temp.show()
will give:
+----+------+---------+--------+---+--------------+
|name|gender| city|donation|age|CumSumDonation|
+----+------+---------+--------+---+--------------+
|Sara| F|Cambridge| 1| 15| 1|
| Cam| F|Cambridge| 1| 25| 2|
| Lin| F|Cambridge| 1| 25| 3|
| Bob| M| Boston| 1| 20| 1|
| Cat| M| Boston| 1| 20| 2|
|Dave| M|Cambridge| 1| 21| 1|
|Jeff| M|Cambridge| 1| 25| 2|
|Bean| M|Cambridge| 1| 26| 3|
+----+------+---------+--------+---+--------------+
HiveContext
的限制。你只是使用了嵌入式Derby作为元数据存储,而这并不适用于生产环境。请参考我在https://dev59.com/E5Lea4cB1Zd3GeqP7t6y的回答。 - zero323HiveContext
,则不支持窗口函数(SPARK-11001)。 如果在spark-shell
/pyspark
中使用sqlContext
,只要Spark已经构建了Hive支持,则会初始化HiveContext
。 - zero323