我一直在试图在一个相对较大的数据集上执行10,000个查询,容量为11M。更具体地说,我正在尝试使用过滤器来转换RDD,基于一些断言,并通过应用COUNT动作计算有多少记录符合该过滤器。
我在拥有16GB内存和8核CPU的本地计算机上运行Apache Spark。我已经将--driver-memory设置为10G,以便将RDD缓存在内存中。
然而,由于我必须重复执行这个操作10,000次,所以完成这个任务需要异常长的时间。我也附上了我的代码,希望它能让事情更清晰。
我在拥有16GB内存和8核CPU的本地计算机上运行Apache Spark。我已经将--driver-memory设置为10G,以便将RDD缓存在内存中。
然而,由于我必须重复执行这个操作10,000次,所以完成这个任务需要异常长的时间。我也附上了我的代码,希望它能让事情更清晰。
加载要查询的查询和数据框。
//load normalized dimensions
val df = spark.read.parquet("/normalized.parquet").cache()
//load query ranges
val rdd = spark.sparkContext.textFile("part-00000")
查询的并行执行
在这里,我的查询被收集在一个列表中,并使用par并行执行。然后,我会收集需要用来过滤数据集的查询所需参数。 isWithin函数调用一个函数,并测试数据集中包含的向量是否在查询指定的边界内。
现在,在对数据集进行过滤之后,我执行计数操作以获取过滤数据集中存在的记录数,然后创建一个字符串报告它们的数量。
val results = queries.par.map(q => {
val volume = q(q.length-1)
val dimensions = q.slice(0, q.length-1)
val count = df.filter(row => {
val v = row.getAs[DenseVector]("scaledOpen")
isWithin(volume, v, dimensions)
}).count
q.mkString(",")+","+count
})
现在,我想说的是,由于数据集非常大,尝试在单台计算机上运行此任务通常非常困难。我知道使用Spark或利用索引可以使其更快,但我想知道是否有办法让它更快。