如何将Spark RDD转换为Mahout DRM?

3

我正在使用sc.textFile()从Alluxio中获取数据,但它是spark RDD。我的程序进一步将此spark RDD用作Mahout DRM,因此我需要将RDD转换为DRM。因此,我的当前代码保持稳定。

2个回答

2

可以按照以下步骤从Apache Spark RDD创建Apache Mahout DRM:

  1. 将RDD的每一行转换为Mahout向量
  2. 使用索引对RDD进行压缩(并交换元组以形成(Long, Vector)的形式)
  3. 使用DRM包装RDD。

考虑以下示例代码:

val rddA = sc.parallelize(Array((1.0, 2.0, 3.0),
            ( 2.0, 3.0, 4.0),
            ( 4.0, 5.0, 6.0)))

val drmRddA: DrmRdd[Long] = rddA.map(a => new DenseVector(a))
                 .zipWithIndex()
                 .map(t => (t._2, t._1))

val drmA = drmWrap(rdd= drmRddA)

来源/更多信息/无耻的自我推销(向下滚动):我的博客


1
转换数据的主要问题通常是Mahout使用整数引用一般矩阵的行和列号,但数据通常具有自己的行和列键,这些键是某种字符串ID。Mahout有一个名为IndexedDatasetSpark的对象,它在BiMaps(实际上是BiDictionaries)中保留了ID,但也创建了一个Mahout DRM。好处是字典将在完成数学运算后将行和列的整数转换回您的ID。如果您有一个元素为矩阵的RDD [String,String],则会进行此转换。如果您有一系列行,则可以从此开始编写自己的转换代码。

https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala#L75


请参考以下代码片段,了解如何将RDD转换为IDS: this gist - rawkintrevo

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接