假设我有一个RowMatrix。
- 如何转置它。API文档似乎没有转置方法。
- 矩阵有transpose()方法。但它不是分布式的。如果我有一个超过内存大小的大矩阵,怎么办?
我已经将RowMatrix转换为DenseMatrix,方法如下:
DenseMatrix Mat = new DenseMatrix(m,n,MatArr);
是否有其他方便的方法可以进行转换?
提前致谢
需要将RowMatrix转换为JavaRDD,再将JavaRDD转换为数组。
假设我有一个RowMatrix。
我已经将RowMatrix转换为DenseMatrix,方法如下:
DenseMatrix Mat = new DenseMatrix(m,n,MatArr);
是否有其他方便的方法可以进行转换?
提前致谢
需要将RowMatrix转换为JavaRDD,再将JavaRDD转换为数组。
如果有人感兴趣的话,我已经实现了@javadba提出的分布式版本。
def transposeRowMatrix(m: RowMatrix): RowMatrix = {
val transposedRowsRDD = m.rows.zipWithIndex.map{case (row, rowIndex) => rowToTransposedTriplet(row, rowIndex)}
.flatMap(x => x) // now we have triplets (newRowIndex, (newColIndex, value))
.groupByKey
.sortByKey().map(_._2) // sort rows and remove row indexes
.map(buildRow) // restore order of elements in each row and remove column indexes
new RowMatrix(transposedRowsRDD)
}
def rowToTransposedTriplet(row: Vector, rowIndex: Long): Array[(Long, (Long, Double))] = {
val indexedRow = row.toArray.zipWithIndex
indexedRow.map{case (value, colIndex) => (colIndex.toLong, (rowIndex, value))}
}
def buildRow(rowWithIndexes: Iterable[(Long, Double)]): Vector = {
val resArr = new Array[Double](rowWithIndexes.size)
rowWithIndexes.foreach{case (index, value) =>
resArr(index.toInt) = value
}
Vectors.dense(resArr)
}
BlockMatrix matA = (new IndexedRowMatrix(...).toBlockMatrix().cache();
matA.validate();
BlockMatrix matB = matA.transpose();
RowMatrix.transpose()
方法。您需要手动执行此操作。
以下是非分布式/本地矩阵版本:
def transpose(m: Array[Array[Double]]): Array[Array[Double]] = {
(for {
c <- m(0).indices
} yield m.map(_(c)) ).toArray
}
origMatRdd.rows.zipWithIndex.map{ case (rvect, i) =>
rvect.zipWithIndex.map{ case (ax, j) => ((j,(i,ax))
}.groupByKey
.sortBy{ case (i, ax) => i }
.foldByKey(new DenseVector(origMatRdd.numRows())) { case (dv, (ix,ax)) =>
dv(ix) = ax
}
注意事项:我没有测试过上述内容,它们会有错误。但是基本方法是有效的,并且与我以前为Spark开发的小型LinAlg库类似。
对于非常大且稀疏的矩阵(例如从文本特征提取得到的矩阵),最好最简单的方法是:
def transposeRowMatrix(m: RowMatrix): RowMatrix = {
val indexedRM = new IndexedRowMatrix(m.rows.zipWithIndex.map({
case (row, idx) => new IndexedRow(idx, row)}))
val transposed = indexedRM.toCoordinateMatrix().transpose.toIndexedRowMatrix()
new RowMatrix(transposed.rows
.map(idxRow => (idxRow.index, idxRow.vector))
.sortByKey().map(_._2))
}
val data = Array(
MllibVectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
MllibVectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
MllibVectors.dense(4.0, 0.0, 0.0, 6.0, 7.0),
MllibVectors.sparse(5, Seq((2, 2.0), (3, 7.0))))
val dataRDD = sc.parallelize(data, 4)
val testMat: RowMatrix = new RowMatrix(dataRDD)
testMat.rows.collect().map(_.toDense).foreach(println)
[0.0,1.0,0.0,7.0,0.0]
[2.0,0.0,3.0,4.0,5.0]
[4.0,0.0,0.0,6.0,7.0]
[0.0,0.0,2.0,7.0,0.0]
transposeRowMatrix(testMat).
rows.collect().map(_.toDense).foreach(println)
[0.0,2.0,4.0,0.0]
[1.0,0.0,0.0,0.0]
[0.0,3.0,0.0,2.0]
[7.0,4.0,6.0,7.0]
[0.0,5.0,7.0,0.0]
在Java中获取RowMatrix的转置:
public static RowMatrix transposeRM(JavaSparkContext jsc, RowMatrix mat){
List<Vector> newList=new ArrayList<Vector>();
List<Vector> vs = mat.rows().toJavaRDD().collect();
double [][] tmp=new double[(int)mat.numCols()][(int)mat.numRows()] ;
for(int i=0; i < vs.size(); i++){
double[] rr=vs.get(i).toArray();
for(int j=0; j < mat.numCols(); j++){
tmp[j][i]=rr[j];
}
}
for(int i=0; i < mat.numCols();i++)
newList.add(Vectors.dense(tmp[i]));
JavaRDD<Vector> rows2 = jsc.parallelize(newList);
RowMatrix newmat = new RowMatrix(rows2.rdd());
return (newmat);
}
def transpose(X: RowMatrix): RowMatrix = {
val m = X.numRows ().toInt
val n = X.numCols ().toInt
val transposed = X.rows.zipWithIndex.flatMap {
case (sp: SparseVector, i: Long) => sp.indices.zip (sp.values).map {case (j, value) => (i, j, value)}
case (dp: DenseVector, i: Long) => Range (0, n).toArray.zip (dp.values).map {case (j, value) => (i, j, value)}
}.sortBy (t => t._1).groupBy (t => t._2).map {case (i, g) =>
val (indices, values) = g.map {case (i, j, value) => (i.toInt, value)}.unzip
if (indices.size == m) {
(i, Vectors.dense (values.toArray) )
} else {
(i, Vectors.sparse (m, indices.toArray, values.toArray))
}
}.sortBy(t => t._1).map (t => t._2)
new RowMatrix (transposed)
}
希望这可以帮到你!