Flink HBase输入用于机器学习算法

3
我希望使用Flink-HBase插件读取数据,该数据将作为Flink机器学习算法(SVM和MLR)的输入。目前,我首先将提取的数据写入临时文件,然后通过libSVM方法读取它,但我认为应该有更高级的方法。您有代码片段或想法吗?

Flink是一个相对较新的项目。我认为,您可能会在Flink邮件列表上获得更好的帮助。 - Anil Gupta
1个回答

3
不需要将数据写入磁盘,然后使用MLUtils.readLibSVM进行读取。原因如下。 MLUtils.readLibSVM 期望的是一个文本文件,其中每一行都是稀疏特征向量及其关联标签。它采用以下格式表示标签-特征向量对:
<line> .=. <label> <feature>:<value> <feature>:<value> ... <feature>:<value> # <info>

其中<feature>是特征向量中后续value的索引。 MLUtils.readLibSVM可以读取这种格式的文件,并将每行转换为LabeledVector实例。因此,在读取libSVM文件后,您将获得一个DataSet[LabeledVector]。这正是您需要用于SVMMultipleLinearRegression预测器的输入格式。

但是,根据从HBase获取的数据格式,您首先必须将数据转换为libSVM格式。否则,MLUtils.readLibSVM将无法读取写入的文件。如果您转换数据,则还可以直接将数据转换为DataSet[LabeledVector]并将其用作Flink的ML算法的输入。这避免了不必要的磁盘循环。

如果您从HBase获取一个DataSet[String],其中每个字符串都具有libSVM格式(请参见上面的规范),那么您可以在HBaseDataSet上应用以下映射函数的map操作。

val hbaseInput: DataSet[String] = ...
val labelCOODS = hbaseInput.flatMap {
  line =>
    // remove all comments which start with a '#'
    val commentFreeLine = line.takeWhile(_ != '#').trim

    if(commentFreeLine.nonEmpty) {
      val splits = commentFreeLine.split(' ')
      val label = splits.head.toDouble
      val sparseFeatures = splits.tail
      val coos = sparseFeatures.map {
        str =>
          val pair = str.split(':')
          require(
            pair.length == 2, 
            "Each feature entry has to have the form <feature>:<value>")

          // libSVM index is 1-based, but we expect it to be 0-based
          val index = pair(0).toInt - 1
          val value = pair(1).toDouble

          (index, value)
      }

      Some((label, coos))
    } else {
      None
    }

// Calculate maximum dimension of vectors
val dimensionDS = labelCOODS.map {
  labelCOO =>
    labelCOO._2.map( _._1 + 1 ).max
}.reduce(scala.math.max(_, _))

val labeledVectors: DataSet[LabeledVector] = 
  labelCOODS.map{ new RichMapFunction[(Double, Array[(Int, Double)]), LabeledVector] {
  var dimension = 0

  override def open(configuration: Configuration): Unit = {
    dimension = getRuntimeContext.getBroadcastVariable(DIMENSION).get(0)
  }

  override def map(value: (Double, Array[(Int, Double)])): LabeledVector = {
    new LabeledVector(value._1, SparseVector.fromCOO(dimension, value._2))
  }
}}.withBroadcastSet(dimensionDS, DIMENSION)

这将把你的libSVM格式数据转换为一个 LabeledVectors 数据集。

谢谢!你的回答非常有帮助!不幸的是,HBase的数据集必须在Java类中获取,现在我遇到了错误,我的DataSet与Scala类中的方法不兼容:错误:(102,29)java:不兼容的类型:'org.apache.flink.api.java.DataSet<java.lang.String>不能转换为org.apache.flink.api.scala.DataSet<java.lang.String>' - MsIcklerly
您还应该能够使用Scala API从HBase中读取数据。然后,您将获得一个org.apache.flink.api.scala.Dataset[String] - Till Rohrmann

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接