MongoDB Spark Connector - 聚合查询速度慢

5
我正在使用Spark应用程序和Mongos控制台运行相同的聚合管道。在控制台上,数据可以在眨眼之间获取,并且只需要使用“it”一次即可检索所有预期数据。 但是,根据Spark WebUI显示,Spark应用程序需要近两分钟的时间。

enter image description here

正如您所看到的,正在启动242个任务来获取结果。虽然MongoDB聚合只返回40个文档,但我不确定为什么会启动这么多任务。看起来存在很高的开销。
我在Mongos控制台上运行的查询:
db.data.aggregate([
   {
      $match:{
         signals:{
            $elemMatch:{
               signal:"SomeSignal",
               value:{
                  $gt:0,
                  $lte:100
               }
            }
         }
      }
   },
   {
      $group:{
         _id:"$root_document",
         firstTimestamp:{
            $min:"$ts"
         },
         lastTimestamp:{
            $max:"$ts"
         },
         count:{
            $sum:1
         }
      }
   }
])

Spark应用程序代码

    JavaMongoRDD<Document> rdd = MongoSpark.load(sc);

    JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(Arrays.asList(
            Document.parse(
                    "{ $match: { signals: { $elemMatch: { signal: \"SomeSignal\", value: { $gt: 0, $lte: 100 } } } } }"),
            Document.parse(
                    "{ $group : { _id : \"$root_document\", firstTimestamp: { $min: \"$ts\"}, lastTimestamp: { $max: \"$ts\"} , count: { $sum: 1 } } }")));

    JavaRDD<String> outputRdd = aggregatedRdd.map(new Function<Document, String>() {
        @Override
        public String call(Document arg0) throws Exception {
            String output = String.format("%s;%s;%s;%s", arg0.get("_id").toString(),
                    arg0.get("firstTimestamp").toString(), arg0.get("lastTimestamp").toString(),
                    arg0.get("count").toString());
            return output;
        }
    });

    outputRdd.saveAsTextFile("/user/spark/output");

之后,我使用hdfs dfs -getmerge /user/spark/output/ output.csv并比较结果。为什么聚合如此缓慢?难道调用withPipeline不是为了减少需要传输到Spark的数据量吗?它看起来没有执行Mongos控制台执行的相同聚合操作。在Mongos控制台上,它非常快。我正在使用Spark 1.6.1和mongo-spark-connector_2.10版本1.1.0。编辑:我还想知道的另一件事是,两个执行器被启动(因为我目前正在使用默认执行设置),但只有一个执行器在工作。为什么第二个执行器没有任何工作?

enter image description here

编辑2:当使用不同的聚合管道并调用.count()而不是saveAsTextFile(..)时,也会创建242个任务。这次将返回65,000个文档。 输入图像描述

1
我会更深入地研究UI,以尝试理解这242个任务是什么。有40个文档,我想它们都可以适合单个分区。 - Ross
当我运行不同的查询并在 aggregatedRdd 上使用 .count() 而不是将其保存到 HDFS 时,也会创建242个任务。不同的查询返回几百万个文档。我的集合统计数据为: data : 15.01GiB docs : 45141000 chunks : 443。我怀疑把它写入 HDFS 不是问题所在。这只是在我的 Spark 应用程序中调用的唯一操作,这就是为什么它被列为 Web UI 中的唯一阶段。或者我错了吗? - j9dy
@Ross 我有点感觉聚合管道没有被执行。我需要特别执行聚合管道吗? - j9dy
@Ross 我刚刚运行了另一个聚合,只使用了这个管道:Document.parse("{ $match: {ts: {$gt: ISODate(\"2016-02-22T08:30:26.000Z\"), $lte: ISODate(\"2016-02-22T08:44:35.000Z\")} } }"),当我在rdd上调用.count()时,又创建了242个任务。你有什么想法吗?我已经在原帖中添加了另一张图片。 - j9dy
@Ross 我从度假回来了,并在聊天中添加了评论。感谢你尝试帮助我。 - j9dy
显示剩余2条评论
1个回答

3
大量任务的原因是默认的Mongo Spark分区策略。在计算分区时,它忽略了聚合管道,主要有两个原因:
  1. 减少计算分区的成本
  2. 确保分片和非分片分区器具有相同的行为
然而,正如您发现的那样,它们可能会生成空分区,在您的情况下代价高昂。
修复的选择可能是:
  1. 更改分区策略

    选择替代分区器以减少分区数量。例如,PaginateByCount会将数据库分成一组分区。

    创建自己的分区器-只需实现特性即可应用聚合管道并对结果进行分区。参见HalfwayPartitionercustom partitioner test的示例。

  2. 使用 $out 预聚合结果到集合中,然后从那里读取。

  3. 使用 coalesce(N) 将分区合并在一起,减少分区数量。
  4. 增加 spark.mongodb.input.partitionerOptions.partitionSizeMB 配置以生成较少的分区。

自定义分区器应该产生最佳解决方案,但有方法可以更好地利用可用的默认分区器。

如果您认为应该有一个默认的分区器,使用聚合管道来计算分区,请添加一个工单到 MongoDB Spark Jira 项目

1
我能否在哈希分片的集合中使用MongoShardedPartitioner?文档中写道:shardkey-该字段应该被索引并且包含唯一值。 在我的情况下,我有一个联合分片键,由我的字段log_file_name:day_of_timestamp:hour_of_timestamp组成,这样可以将相关数据保存在附近 - 至少我希望是这样。但是,预散列的值不是唯一的。文档是在谈论已经被哈希的值吗?另外,我有一个小问题,想请问如何在聊天中使用MongoSpark进行多个查询 -- 如果您不介意的话,请看看它。 - j9dy

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接