在Spark SQL中使用collect_list和collect_set

16
根据文档,Spark SQL应该支持collect_setcollect_list函数。然而,我在运行使用Docker镜像的Spark 1.6.0时无法正常工作。我尝试在Scala中执行此操作:
import org.apache.spark.sql.functions._ 

df.groupBy("column1") 
  .agg(collect_set("column2")) 
  .show() 

并且在运行时收到以下错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: undefined function collect_set; 

尝试使用pyspark也失败了,文档说明这些函数是Hive UDAF的别名,但我无法弄清如何启用这些函数。

如何修复?谢谢!

1个回答

33

Spark 2.0+:

SPARK-10605 引入了原生的 collect_listcollect_set 实现。不再需要使用支持Hive的 SparkSessionHiveContext

Spark 2.0-SNAPSHOT (在2016-05-03之前):

你必须为给定的 SparkSession 启用Hive支持:

在Scala中:

val spark = SparkSession.builder
  .master("local")
  .appName("testing")
  .enableHiveSupport()  // <- enable Hive support.
  .getOrCreate()

在Python中:

spark = (SparkSession.builder
    .enableHiveSupport()
    .getOrCreate())

Spark < 2.0:

使用Hive UDFs(参见https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF)需要使用支持Hive的Spark版本(当您使用预编译的二进制文件时,这已经包含在内),并使用HiveContext初始化SparkContext

Scala示例:

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.SQLContext

val sqlContext: SQLContext = new HiveContext(sc) 

在Python中:

from pyspark.sql import HiveContext

sqlContext = HiveContext(sc)

关于1.6.1版本怎么样? 文档中写着可用的是“@since 1.6.0”,但我仍然遇到了那个错误。 - Khachatur Stepanyan
嘿@zero323,我正在尝试在Spark 1.5.0中使用'collect_list'函数。我已经创建了hive上下文,但是无法弄清楚如何导入该函数。这不会编译:.groupBy(providerData(“PRVSEQ”),providerData(“PROV_NUM”)) .agg(collect_list(regexp_replace(triggerReport(“match_type”),“_(Individual | Practice)Model。”,“”))) - nemo
@VijayRatnagiri 它在1.6中被引入。就我所记得的而言,你应该能够在已注册的临时表上使用原生SQL查询。 - zero323
1
@KhachaturStepanyan 在1.6版本中仍需要Hive支持。 - zero323

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接