在Apache Spark上迭代运行查询

3
我一直在试图在一个相对较大的数据集上执行10,000个查询,容量为11M。更具体地说,我正在尝试使用过滤器来转换RDD,基于一些断言,并通过应用COUNT动作计算有多少记录符合该过滤器。
我在拥有16GB内存和8核CPU的本地计算机上运行Apache Spark。我已经将--driver-memory设置为10G,以便将RDD缓存在内存中。
然而,由于我必须重复执行这个操作10,000次,所以完成这个任务需要异常长的时间。我也附上了我的代码,希望它能让事情更清晰。

加载要查询的查询和数据框。

//load normalized dimensions
val df = spark.read.parquet("/normalized.parquet").cache()
//load query ranges
val rdd = spark.sparkContext.textFile("part-00000")

查询的并行执行

在这里,我的查询被收集在一个列表中,并使用par并行执行。然后,我会收集需要用来过滤数据集的查询所需参数。 isWithin函数调用一个函数,并测试数据集中包含的向量是否在查询指定的边界内。

现在,在对数据集进行过滤之后,我执行计数操作以获取过滤数据集中存在的记录数,然后创建一个字符串报告它们的数量。

val results = queries.par.map(q => {
  val volume = q(q.length-1)
  val dimensions = q.slice(0, q.length-1)
  val count = df.filter(row => {
    val v = row.getAs[DenseVector]("scaledOpen")
    isWithin(volume, v, dimensions)
  }).count
  q.mkString(",")+","+count
})

现在,我想说的是,由于数据集非常大,尝试在单台计算机上运行此任务通常非常困难。我知道使用Spark或利用索引可以使其更快,但我想知道是否有办法让它更快。

1个回答

1

尽管您将本地集合的访问并行化,但这并不意味着任何内容都是并行执行的。可以同时执行的作业数量受到群集资源而非驱动程序代码的限制。

与此同时,Spark专为高延迟批处理作业而设计。如果作业数达到数万个,您就不能指望事情变得快速。

您可以尝试的一件事是将筛选器下推到单个作业中。将DataFrame转换为RDD

import org.apache.spark.mllib.linalg.{Vector => MLlibVector}
import org.apache.spark.rdd.RDD

val vectors: RDD[org.apache.spark.mllib.linalg.DenseVector] = df.rdd.map(
  _.getAs[MLlibVector]("scaledOpen").toDense
)

将向量映射为 {0, 1} 指示器:

map

import breeze.linalg.DenseVector

// It is not clear what is the type of queries
type Q = ???
val queries: Seq[Q] = ???

val inds: RDD[breeze.linalg.DenseVector[Long]] = vectors.map(v => {
  //  Create {0, 1} indicator vector
  DenseVector(queries.map(q => {
    // Define as before
    val volume = ???
    val dimensions = ???

    // Output 0 or 1 for each q
    if (isWithin(volume, v, dimensions)) 1L else 0L
  }): _*)
})

aggregate部分结果:

val counts: breeze.linalg.DenseVector[Long] = inds
  .aggregate(DenseVector.zeros[Long](queries.size))(_ += _, _ += _)

并准备最终输出:

queries.zip(counts.toArray).map {
  case (q, c) => s"""${q.mkString(",")},$c"""
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接