Spark中RowMatrix的矩阵转置

13

假设我有一个RowMatrix。

  1. 如何转置它。API文档似乎没有转置方法。
  2. 矩阵有transpose()方法。但它不是分布式的。如果我有一个超过内存大小的大矩阵,怎么办?
  3. 我已经将RowMatrix转换为DenseMatrix,方法如下:

    DenseMatrix Mat = new DenseMatrix(m,n,MatArr);
    

    是否有其他方便的方法可以进行转换?

    提前致谢

    需要将RowMatrix转换为JavaRDD,再将JavaRDD转换为数组。

6个回答

17

如果有人感兴趣的话,我已经实现了@javadba提出的分布式版本。

  def transposeRowMatrix(m: RowMatrix): RowMatrix = {
    val transposedRowsRDD = m.rows.zipWithIndex.map{case (row, rowIndex) => rowToTransposedTriplet(row, rowIndex)}
      .flatMap(x => x) // now we have triplets (newRowIndex, (newColIndex, value))
      .groupByKey
      .sortByKey().map(_._2) // sort rows and remove row indexes
      .map(buildRow) // restore order of elements in each row and remove column indexes
    new RowMatrix(transposedRowsRDD)
  }


  def rowToTransposedTriplet(row: Vector, rowIndex: Long): Array[(Long, (Long, Double))] = {
    val indexedRow = row.toArray.zipWithIndex
    indexedRow.map{case (value, colIndex) => (colIndex.toLong, (rowIndex, value))}
  }

  def buildRow(rowWithIndexes: Iterable[(Long, Double)]): Vector = {
    val resArr = new Array[Double](rowWithIndexes.size)
    rowWithIndexes.foreach{case (index, value) =>
        resArr(index.toInt) = value
    }
    Vectors.dense(resArr)
  } 

你能分享一下这个的 PySpark 版本吗? - Nikhil Baby
抱歉,我很久以前写的,现在已经有一段时间没有接触Spark了。但是适应它应该不难。 - ars

6
您可以使用BlockMatrix,该矩阵可以从IndexedRowMatrix创建:
BlockMatrix matA = (new IndexedRowMatrix(...).toBlockMatrix().cache();
matA.validate();

BlockMatrix matB = matA.transpose();

然后,可以轻松地将其放回为IndexedRowMatrix。这在spark文档中有描述。 (链接)

5
你说得没错:这里确实没有。
 RowMatrix.transpose()

方法。您需要手动执行此操作。

以下是非分布式/本地矩阵版本:

def transpose(m: Array[Array[Double]]): Array[Array[Double]] = {
    (for {
      c <- m(0).indices
    } yield m.map(_(c)) ).toArray
}

分布式版本将沿以下方式进行:
    origMatRdd.rows.zipWithIndex.map{ case (rvect, i) =>
        rvect.zipWithIndex.map{ case (ax, j) => ((j,(i,ax))
    }.groupByKey
    .sortBy{ case (i, ax) => i }
    .foldByKey(new DenseVector(origMatRdd.numRows())) { case (dv, (ix,ax))  =>
              dv(ix) = ax
     }

注意事项:我没有测试过上述内容,它们有错误。但是基本方法是有效的,并且与我以前为Spark开发的小型LinAlg库类似。


1
分布式矩阵转置是一个未解决的问题吗? - Chandan
2
据我所知,是这样的。例如,四月份向Spark用户列表发送有关大矩阵转置的电子邮件没有收到任何回复。 - WestCoastProjects
进一步检查发现,针对此问题以及相关的RowSimilarity和BlockMatrix操作,有未解决的JIRA。 - WestCoastProjects

3

对于非常大且稀疏的矩阵(例如从文本特征提取得到的矩阵),最好最简单的方法是:

def transposeRowMatrix(m: RowMatrix): RowMatrix = {
  val indexedRM = new IndexedRowMatrix(m.rows.zipWithIndex.map({
    case (row, idx) => new IndexedRow(idx, row)}))
  val transposed = indexedRM.toCoordinateMatrix().transpose.toIndexedRowMatrix()
  new RowMatrix(transposed.rows
    .map(idxRow => (idxRow.index, idxRow.vector))
    .sortByKey().map(_._2))      
}

对于非稀疏矩阵,您可以像aletapool在上面的答案中提到的那样使用BlockMatrix作为桥梁。
然而,aletapool的回答忽略了一个非常重要的问题:当您从RowMaxtrix -> IndexedRowMatrix -> BlockMatrix -> transpose -> BlockMatrix -> IndexedRowMatrix -> RowMatrix开始时,在最后一步(IndexedRowMatrix -> RowMatrix)中,您必须进行排序。因为默认情况下,从IndexedRowMatrix转换为RowMatrix时,索引会被简单地丢弃,并且顺序会被搞乱。
val data = Array(
  MllibVectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
  MllibVectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
  MllibVectors.dense(4.0, 0.0, 0.0, 6.0, 7.0),
  MllibVectors.sparse(5, Seq((2, 2.0), (3, 7.0))))

val dataRDD = sc.parallelize(data, 4)

val testMat: RowMatrix = new RowMatrix(dataRDD)
testMat.rows.collect().map(_.toDense).foreach(println)

[0.0,1.0,0.0,7.0,0.0]
[2.0,0.0,3.0,4.0,5.0]
[4.0,0.0,0.0,6.0,7.0]
[0.0,0.0,2.0,7.0,0.0]

transposeRowMatrix(testMat).
  rows.collect().map(_.toDense).foreach(println)

[0.0,2.0,4.0,0.0]
[1.0,0.0,0.0,0.0]
[0.0,3.0,0.0,2.0]
[7.0,4.0,6.0,7.0]
[0.0,5.0,7.0,0.0]

1

在Java中获取RowMatrix的转置:

public static RowMatrix transposeRM(JavaSparkContext jsc, RowMatrix mat){
List<Vector> newList=new ArrayList<Vector>();
List<Vector> vs = mat.rows().toJavaRDD().collect();
double [][] tmp=new double[(int)mat.numCols()][(int)mat.numRows()] ;

for(int i=0; i < vs.size(); i++){
    double[] rr=vs.get(i).toArray();
    for(int j=0; j < mat.numCols(); j++){
        tmp[j][i]=rr[j];
    }
}

for(int i=0; i < mat.numCols();i++)
    newList.add(Vectors.dense(tmp[i]));

JavaRDD<Vector> rows2 = jsc.parallelize(newList);
RowMatrix newmat = new RowMatrix(rows2.rdd());
return (newmat);
}

0
这是先前解决方案的一个变体,适用于稀疏行矩阵,并在需要时保持转置的稀疏性。
  def transpose(X: RowMatrix): RowMatrix = {
    val m = X.numRows ().toInt
    val n = X.numCols ().toInt

    val transposed = X.rows.zipWithIndex.flatMap {
      case (sp: SparseVector, i: Long) => sp.indices.zip (sp.values).map {case (j, value) => (i, j, value)}
      case (dp: DenseVector, i: Long) => Range (0, n).toArray.zip (dp.values).map {case (j, value) => (i, j, value)}
    }.sortBy (t => t._1).groupBy (t => t._2).map {case (i, g) =>
      val (indices, values) = g.map {case (i, j, value) => (i.toInt, value)}.unzip
      if (indices.size == m) {
        (i, Vectors.dense (values.toArray) )
      } else {
        (i, Vectors.sparse (m, indices.toArray, values.toArray))
      }
    }.sortBy(t => t._1).map (t => t._2)

    new RowMatrix (transposed)
  }

希望这可以帮到你!


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接