如何使用Spark的RDD执行矩阵向量点积

3

现在有一个矩阵和一个向量,我想对它们进行点积操作。以下是Scala代码:

val matrix = sc.parallelize(List(
  (("v1","v1"),2),(("v1","v2"),4),(("v1","v3"),1),(("v2","v2"),5),
  (("v2","v3"),1),(("v3","v3"),2)))

val vector = sc.parallelize(List(("v1",4),("v2",1),("v3",5)))

val dotproduct = matrix.flatMap{x => { 
  vector.flatMap { y => { 
    if(x._1._2 == y._1) Tuple2(x._1._1, x._2 * y._2)
  }}
}}.reduceByKey((_,_) => _+_)

但是出现了以下错误:
<console>:25: error: type mismatch;
found   : (String, Int)
required: TraversableOnce[?]
val dotproduct = matrix.flatMap{ x => { vector.flatMap { y => { if(x._1._2 == y._1) (x._1._1, x._2 * y._2) }}}}.reduceByKey((_,_) => _+_)
                                                                                         ^

我不确定RDD中的嵌套操作是否可行。Spark MLlib提供了执行矩阵和向量点积的API吗?


矩阵是对称的,因此我只使用了对角线上方的元素。 - victorming888
2个回答

3

我不确定RDD中的嵌套操作是否可行。

不可行。Spark不支持嵌套操作、转换或分布式数据结构。

Spark MLlib提供任何API来执行矩阵和向量之间的点积吗?

RowMatrix提供了一个multiply方法,可以接受一个本地矩阵。在您的情况下应该可以正常工作。

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}

val idx = "^v([0-9]+)$".r

val rdd = sc.parallelize(List(
  (("v1", "v1"), 2), (("v1", "v2"), 4),
  (("v1", "v3"), 1), (("v2", "v2"), 5),
  (("v2", "v3"), 1), (("v3", "v3"), 2)
))

val mat = new CoordinateMatrix(rdd.map { case ((idx(i), idx(j)), v) => 
  MatrixEntry(i.toLong - 1, j.toLong - 1, v.toDouble)
}).toIndexedRowMatrix

val vector =  Matrices.dense(3, 1, Array(4.0, 1.0, 5.0))
mat.multiply(vector).rows

如果向量大小超出内存处理范围,您可以使用分块矩阵。请参考Apache Spark中的矩阵乘法
关于您的代码,您可以像这样做:
matrix
  .map{case ((i, j), v) => (j, (i, v))}
  .join(vector)
  .values
  .map{case ((i, v1), v2) => (i, v1 * v2)}
  .reduceByKey(_ + _)

或者使用本地的“向量”(可选择广播):
val vector = Map(("v1" -> 4), ("v2" -> 1), ("v3" -> 5)).withDefault(_ => 0)

matrix.map{case ((i, j), v) => (i, v * vector(j))}.reduceByKey(_ + _)

感谢您的澄清,zero323! - victorming888

2
假设你所说的“点积”就是普通的矩阵-向量乘法,你可以使用mllib.linalg包中的multiply方法。
val mlMat=Matrices.dense(3,2,matrix.collect().map(_._2.toDouble)).transpose
val mlVect=Vectors.dense(vector.collect().map(_._2.toDouble))
mlMat.multiply(mlVect)
//org.apache.spark.mllib.linalg.DenseVector = [17.0,31.0]

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接