如何使用sqlContext计算累积总和

4
我知道我们可以使用 Pyspark中的窗口函数 来计算累积和。但是Window仅支持HiveContext而不支持SQLContext。我需要使用SQLContext,因为HiveContext无法在多进程中运行。
有没有一种有效的方法可以使用SQLContext计算累积和?一个简单的方法是将数据加载到驱动程序的内存中,并使用numpy.cumsum,但缺点是数据需要能够适应内存。

我已经广泛使用了sqlContext中的窗口函数。 - KrisP
@KrisP,你能否给我一个使用Window和sqlContext的例子吗?我非常感激,因为我在这个问题上卡住了。谢谢!我的Window函数可以在使用HiveContext时工作,但是当我使用sqlContext时会崩溃,并显示错误“请注意,使用窗口函数当前需要HiveContext;” - Michael
1
这并不是HiveContext的限制。你只是使用了嵌入式Derby作为元数据存储,而这并不适用于生产环境。请参考我在https://dev59.com/E5Lea4cB1Zd3GeqP7t6y的回答。 - zero323
@KrisP:如果没有使用HiveContext,则不支持窗口函数(SPARK-11001)。 如果在spark-shell/pyspark中使用sqlContext,只要Spark已经构建了Hive支持,则会初始化HiveContext - zero323
1
不需要更改Spark代码,但您需要一些DevOps技能。 - zero323
显示剩余3条评论
4个回答

11

我不确定这是否符合您的要求,但以下是两个使用 sqlContext 计算累积总和的示例:

首先,当您想将其按某些类别进行分区时:

from pyspark.sql.types import StructType, StringType, LongType
from pyspark.sql import SQLContext

rdd = sc.parallelize([
    ("Tablet", 6500), 
    ("Tablet", 5500), 
    ("Cell Phone", 6000), 
    ("Cell Phone", 6500), 
    ("Cell Phone", 5500)
    ])

schema = StructType([
    StructField("category", StringType(), False),
    StructField("revenue", LongType(), False)
    ])

df = sqlContext.createDataFrame(rdd, schema)

df.registerTempTable("test_table")

df2 = sqlContext.sql("""
SELECT
    category,
    revenue,
    sum(revenue) OVER (PARTITION BY category ORDER BY revenue) as cumsum
FROM
test_table
""")

输出:

[Row(category='Tablet', revenue=5500, cumsum=5500),
 Row(category='Tablet', revenue=6500, cumsum=12000),
 Row(category='Cell Phone', revenue=5500, cumsum=5500),
 Row(category='Cell Phone', revenue=6000, cumsum=11500),
 Row(category='Cell Phone', revenue=6500, cumsum=18000)]

第二种情况是当你只想对一个变量进行累加时。 将 df2 更改为以下内容:

df2 = sqlContext.sql("""
SELECT
    category,
    revenue,
    sum(revenue) OVER (ORDER BY revenue, category) as cumsum
FROM
test_table
""")

输出:

[Row(category='Cell Phone', revenue=5500, cumsum=5500),
 Row(category='Tablet', revenue=5500, cumsum=11000),
 Row(category='Cell Phone', revenue=6000, cumsum=17000),
 Row(category='Cell Phone', revenue=6500, cumsum=23500),
 Row(category='Tablet', revenue=6500, cumsum=30000)]

希望这能帮到你。在收集数据后使用np.cumsum不是很有效,特别是如果数据集很大的情况下。你可以尝试另一种方式,使用简单的RDD转换,比如groupByKey(),然后使用map来计算每个键的累加和,最后再进行reduce。


谢谢,但你的解决方案适用于hiveContext而不是sqlContext。你能输出一下你的sqlContext吗?应该显示它是一个hiveContext。 - Michael

5

以下是一个简单的例子:

import pyspark
from pyspark.sql import window
import pyspark.sql.functions as sf


sc = pyspark.SparkContext(appName="test")
sqlcontext = pyspark.SQLContext(sc)

data = sqlcontext.createDataFrame([("Bob", "M", "Boston", 1, 20),
                                   ("Cam", "F", "Cambridge", 1, 25),
                                  ("Lin", "F", "Cambridge", 1, 25),
                                  ("Cat", "M", "Boston", 1, 20),
                                  ("Sara", "F", "Cambridge", 1, 15),
                                  ("Jeff", "M", "Cambridge", 1, 25),
                                  ("Bean", "M", "Cambridge", 1, 26),
                                  ("Dave", "M", "Cambridge", 1, 21),], 
                                 ["name", 'gender', "city", 'donation', "age"])


data.show()

输出结果

+----+------+---------+--------+---+
|name|gender|     city|donation|age|
+----+------+---------+--------+---+
| Bob|     M|   Boston|       1| 20|
| Cam|     F|Cambridge|       1| 25|
| Lin|     F|Cambridge|       1| 25|
| Cat|     M|   Boston|       1| 20|
|Sara|     F|Cambridge|       1| 15|
|Jeff|     M|Cambridge|       1| 25|
|Bean|     M|Cambridge|       1| 26|
|Dave|     M|Cambridge|       1| 21|
+----+------+---------+--------+---+

定义一个窗口

win_spec = (window.Window
                  .partitionBy(['gender', 'city'])
                  .rowsBetween(window.Window.unboundedPreceding, 0))

# window.Window.unboundedPreceding -- 组的第一行 # .rowsBetween(..., 0) -- 0 表示当前行,如果指定为 -2 则表示当前行前两行
现在有一个陷阱:
temp = data.withColumn('cumsum',sum(data.donation).over(win_spec))

出现错误:

TypeErrorTraceback (most recent call last)
<ipython-input-9-b467d24b05cd> in <module>()
----> 1 temp = data.withColumn('cumsum',sum(data.donation).over(win_spec))

/Users/mupadhye/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/column.pyc in __iter__(self)
    238 
    239     def __iter__(self):
--> 240         raise TypeError("Column is not iterable")
    241 
    242     # string methods

TypeError: Column is not iterable

这是因为使用了Python的sum函数而不是pyspark的。修复方法是使用pyspark.sql.functions.sum中的sum函数:

temp = data.withColumn('AgeSum',sf.sum(data.donation).over(win_spec))
temp.show()

will give:

+----+------+---------+--------+---+--------------+
|name|gender|     city|donation|age|CumSumDonation|
+----+------+---------+--------+---+--------------+
|Sara|     F|Cambridge|       1| 15|             1|
| Cam|     F|Cambridge|       1| 25|             2|
| Lin|     F|Cambridge|       1| 25|             3|
| Bob|     M|   Boston|       1| 20|             1|
| Cat|     M|   Boston|       1| 20|             2|
|Dave|     M|Cambridge|       1| 21|             1|
|Jeff|     M|Cambridge|       1| 25|             2|
|Bean|     M|Cambridge|       1| 26|             3|
+----+------+---------+--------+---+--------------+

1
你的示例中未定义win_spec,请添加它。这将有助于理解您优秀的示例。 - Mike
糟糕,我的错 @Mike,我会努力找出我的代码库 ;) 希望一切顺利。 - muon

1
在尝试解决类似问题时,我着陆到这个线程,并使用此代码解决了我的问题。不确定我是否遗漏了OP的一部分,但这是一种汇总SQLContext列的方法:
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext

sc = SparkContext() 
sc.setLogLevel("ERROR")
conf = SparkConf()
conf.setAppName('Sum SQLContext Column')
conf.set("spark.executor.memory", "2g")
sqlContext = SQLContext(sc)

def sum_column(table, column):
    sc_table = sqlContext.table(table)
    return sc_table.agg({column: "sum"})

sum_column("db.tablename", "column").show()

0

Windows函数不仅适用于HiveContext,也可以在sqlContext中使用:

from pyspark.sql.window import *

myPartition=Window.partitionBy(['col1','col2','col3'])

temp= temp.withColumn("#dummy",sum(temp.col4).over(myPartition))

只有在Spark 2.0+上才能使用SQLContext的窗口函数。对于Spark版本1.4〜1.6,必须使用HiveContext。 - Daniel de Paula
不,它们是从Spark版本1.4引入的。 - Abhishek Kgsk
1
它们自1.4版本以来就存在了,但在Spark 2之前,需要使用HiveContext。然而,在许多发行版中,“sqlContext”的实例的默认类在spark-shell和pyspark中都是HiveContext,因此这可能会导致一些混淆,人们会认为可以使用普通的SQLContext来使用窗口函数。您可以参考此问题获取更多信息:https://dev59.com/MpXfa4cB1Zd3GeqPdlgk - Daniel de Paula

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接