无法让Spark聚合器正常工作的问题

4

我想尝试在Scala Spark中使用聚合器,但是我似乎无法同时使用select函数和groupBy/agg函数(使用我的当前实现,agg函数无法编译)。我的聚合器如下所示,应该很容易理解。

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}

/** Stores the number of true counts (tc) and false counts (fc) */
case class Counts(var tc: Long, var fc: Long)

/** Count the number of true and false occurances of a function */
class BooleanCounter[A](f: A => Boolean) extends Aggregator[A, Counts, Counts] with Serializable {
  // Initialize both counts to zero
  def zero: Counts = Counts(0L, 0L) 
  // Sum counts for intermediate value and new value
  def reduce(acc: Counts, other: A): Counts = { 
    if (f(other)) acc.tc += 1 else acc.fc += 1
    acc 
  }
  // Sum counts for intermediate values
  def merge(acc1: Counts, acc2: Counts): Counts = { 
    acc1.tc += acc2.tc
    acc1.fc += acc2.fc
    acc1
  }
  // Return results
  def finish(acc: Counts): Counts = acc 
  // Encoder for intermediate value type
  def bufferEncoder: Encoder[Counts] = Encoders.product[Counts]
  // Encoder for return type
  def outputEncoder: Encoder[Counts] = Encoders.product[Counts]
}

以下是我的测试代码。
val ds: Dataset[Employee] = Seq(
  Employee("John", 110),
  Employee("Paul", 100),
  Employee("George", 0), 
  Employee("Ringo", 80) 
).toDS()

val salaryCounter = new BooleanCounter[Employee]((r: Employee) => r.salary < 10).toColumn
// Usage works fine 
ds.select(salaryCounter).show()
// Causes an error
ds.groupBy($"name").agg(salaryCounter).show()
< p >第一个 salaryCounter 使用正常,但第二个会导致以下编译错误。

java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to Employee 

Databricks有一个教程,它与Spark 2.3有关,但相当复杂。此外,还有一份较早的教程,使用了Spark 1.6的实验性功能。

这个教程是数据聚合器,看起来比较复杂。

这篇较早的教程则介绍了使用Spark 1.6中一项实验性功能的类型安全性。


“ds.groupBy($“name”).agg(salaryCounter).show()”这个操作有何意义?早期聚合的输出仅返回一个包含2列(tc,fc)和1行的数据集。你对这个操作的期望输出是什么?显然,当应用于“ds.groupBy($“name”)”时,UDAF不起作用,因为在这种情况下提供给UDAF的输入并不是Employee。” - sujit
理想情况下,这将生成一个数据集,其中每一行对应于单个名称,每个名称的列(tc,fc)分别对应于具有给定名称的员工数量,其收入低于或高于10美元。 UDAF不是应用于ds.groupBy($“name”),而是传递给.agg函数。请参阅我链接的教程,因为它们具有似乎有效的示例用法。 - Joshua Howard
1个回答

4

您将“静态类型”和“动态类型”的API混淆了。 要使用前者版本,您应该在KeyValueGroupedDataset上调用agg,而不是RelationalGroupedDataset

ds.groupByKey(_.name).agg(salaryCounter)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接