如何定义一个自定义聚合函数来对向量列进行求和?

27

我有一个包含两列的DataFrame,ID列类型为IntVec列类型为Vector,即org.apache.spark.mllib.linalg.Vector

该DataFrame长这样:

ID,Vec
1,[0,0,5]
1,[4,0,1]
1,[1,2,1]
2,[7,5,0]
2,[3,3,4]
3,[0,8,1]
3,[0,0,1]
3,[7,7,7]
....

我希望对$"ID"进行groupBy操作,然后在每个组内对向量进行求和的聚合操作。

上述示例的期望输出为:

ID,SumOfVectors
1,[5,2,7]
2,[10,8,4]
3,[7,15,9]
...

可用的聚合函数无法使用,例如 df.groupBy($"ID").agg(sum($"Vec") 会导致类转换异常。

如何实现自定义聚合函数以允许我执行向量或数组的求和或任何其他自定义操作?


3
可能是重复的问题,参考如何在Spark SQL中定义和使用用户定义的聚合函数? - nojka_kruva
如果有人想在Pyspark中尝试类似的操作,语法在这里:https://dev59.com/1rHma4cB1Zd3GeqPKVxn#54870161 - seth127
3个回答

35

Spark >= 3.0

您可以使用sum参数来使用Summarizer

import org.apache.spark.ml.stat.Summarizer

df
  .groupBy($"id")
  .agg(Summarizer.sum($"vec").alias("vec"))

Spark <= 3.0

个人而言,我不会费心去使用UDAFs。它们通常比较冗长,而且速度也不是很快 (Spark UDAF with ArrayType as bufferSchema performance issues)。相反,我会简单地使用reduceByKey / foldByKey

import org.apache.spark.sql.Row
import breeze.linalg.{DenseVector => BDV}
import org.apache.spark.ml.linalg.{Vector, Vectors}

def dv(values: Double*): Vector = Vectors.dense(values.toArray)

val df = spark.createDataFrame(Seq(
    (1, dv(0,0,5)), (1, dv(4,0,1)), (1, dv(1,2,1)),
    (2, dv(7,5,0)), (2, dv(3,3,4)), 
    (3, dv(0,8,1)), (3, dv(0,0,1)), (3, dv(7,7,7)))
  ).toDF("id", "vec")

val aggregated = df
  .rdd
  .map{ case Row(k: Int, v: Vector) => (k, BDV(v.toDense.values)) }
  .foldByKey(BDV.zeros[Double](3))(_ += _)
  .mapValues(v => Vectors.dense(v.toArray))
  .toDF("id", "vec")

aggregated.show

// +---+--------------+
// | id|           vec|
// +---+--------------+
// |  1| [5.0,2.0,7.0]|
// |  2|[10.0,8.0,4.0]|
// |  3|[7.0,15.0,9.0]|
// +---+--------------+

仅作比较,这里是一个“简单”的UDAF。所需导入内容:

import org.apache.spark.sql.expressions.{MutableAggregationBuffer,
  UserDefinedAggregateFunction}
import org.apache.spark.ml.linalg.{Vector, Vectors, SQLDataTypes}
import org.apache.spark.sql.types.{StructType, ArrayType, DoubleType}
import org.apache.spark.sql.Row
import scala.collection.mutable.WrappedArray

类定义:

class VectorSum (n: Int) extends UserDefinedAggregateFunction {
    def inputSchema = new StructType().add("v", SQLDataTypes.VectorType)
    def bufferSchema = new StructType().add("buff", ArrayType(DoubleType))
    def dataType = SQLDataTypes.VectorType
    def deterministic = true 

    def initialize(buffer: MutableAggregationBuffer) = {
      buffer.update(0, Array.fill(n)(0.0))
    }

    def update(buffer: MutableAggregationBuffer, input: Row) = {
      if (!input.isNullAt(0)) {
        val buff = buffer.getAs[WrappedArray[Double]](0) 
        val v = input.getAs[Vector](0).toSparse
        for (i <- v.indices) {
          buff(i) += v(i)
        }
        buffer.update(0, buff)
      }
    }

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
      val buff1 = buffer1.getAs[WrappedArray[Double]](0) 
      val buff2 = buffer2.getAs[WrappedArray[Double]](0) 
      for ((x, i) <- buff2.zipWithIndex) {
        buff1(i) += x
      }
      buffer1.update(0, buff1)
    }

    def evaluate(buffer: Row) =  Vectors.dense(
      buffer.getAs[Seq[Double]](0).toArray)
} 

还有一个使用示例:

df.groupBy($"id").agg(new VectorSum(3)($"vec") alias "vec").show

// +---+--------------+
// | id|           vec|
// +---+--------------+
// |  1| [5.0,2.0,7.0]|
// |  2|[10.0,8.0,4.0]|
// |  3|[7.0,15.0,9.0]|
// +---+--------------+

另请参阅:如何在Spark SQL中查找分组向量列的平均值?


我看到这个技巧是使用breeze.linalg.DensVector,为什么它有效而mllib.linalg的密集向量不行? - Rami
1
é—®é¢کهœ¨ن؛ژScala版وœ¬çڑ„mllib.linalg.Vectorو²،وœ‰+و–¹و³•م€‚ - zero323
@zero323,我现在正在尝试在Sark 2.0上进行操作,并将向量传递给标准化程序,但没有成功,我得到了以下错误信息:org.apache.spark.mllib.linalg.DenseVector无法转换为org.apache.spark.ml.linalg.Vector。请问在Spark 2.0上有任何更新吗? - Rami
1
@Rami 你需要导入 o.a.s.ml.linalg - zero323
@Gevorg 如果你喜欢使用 null,你可以初始化为 null 并在看到第一个非空条目后仅设置数组一次。虽然我不太喜欢这个解决方案,但它是可行的。 - zero323
显示剩余3条评论

0

使用我的版本pyspark 3.0.0,您可以轻松使用Summarizer来完成它。您的列需要是DenseVector类型。

from pyspark.ml.stat import Summarizer
sdf.groupBy("ID").agg(Summarizer.mean(sdf.Vec)).show()

注意:在pyspark中没有avg函数,但是您可以使用mean方法。

0
我建议使用以下方法(适用于Spark 2.0.2及以上版本),它可能会被优化,但非常好用。在创建UDAF实例时,您需要提前知道向量大小。
import org.apache.spark.ml.linalg._
import org.apache.spark.mllib.linalg.WeightedSparseVector
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

class VectorAggregate(val numFeatures: Int)
   extends UserDefinedAggregateFunction {

private type B = Map[Int, Double]

def inputSchema: StructType = StructType(StructField("vec", new VectorUDT()) :: Nil)

def bufferSchema: StructType =
StructType(StructField("agg", MapType(IntegerType, DoubleType)) :: Nil)

def initialize(buffer: MutableAggregationBuffer): Unit =
buffer.update(0, Map.empty[Int, Double])

def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val zero = buffer.getAs[B](0)
    input match {
        case Row(DenseVector(values)) => buffer.update(0, values.zipWithIndex.foldLeft(zero){case (acc,(v,i)) => acc.updated(i, v + acc.getOrElse(i,0d))})
        case Row(SparseVector(_, indices, values)) => buffer.update(0, values.zip(indices).foldLeft(zero){case (acc,(v,i)) => acc.updated(i, v + acc.getOrElse(i,0d))}) }}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val zero = buffer1.getAs[B](0)
buffer1.update(0, buffer2.getAs[B](0).foldLeft(zero){case (acc,(i,v)) => acc.updated(i, v + acc.getOrElse(i,0d))})}

def deterministic: Boolean = true

def evaluate(buffer: Row): Any = {
    val Row(agg: B) = buffer
    val indices = agg.keys.toArray.sorted
    Vectors.sparse(numFeatures,indices,indices.map(agg)).compressed
}

def dataType: DataType = new VectorUDT()
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接