Spark SQL + Cassandra:性能差

5

我刚刚开始使用Spark SQL + Cassandra,可能缺少重要的东西,但一个简单的查询需要大约45秒。我使用cassandra-spark-connector库,并运行本地Web服务器,该服务器还托管Spark。因此,我的设置大致如下:

在sbt中:

    "org.apache.spark" %% "spark-core" % "1.4.1" excludeAll(ExclusionRule(organization = "org.slf4j")),
    "org.apache.spark" %% "spark-sql" % "1.4.1" excludeAll(ExclusionRule(organization = "org.slf4j")),
    "com.datastax.spark" %% "spark-cassandra-connector" % "1.4.0-M3" excludeAll(ExclusionRule(organization = "org.slf4j"))

我有一个单例对象,其中包含SparkContextCassandraSQLContetx。然后从servlet中调用它。下面是单例对象的代码:

object SparkModel {

  val conf =
    new SparkConf()
      .setAppName("core")
      .setMaster("local")
      .set("spark.cassandra.connection.host", "127.0.0.1")

  val sc = new SparkContext(conf)
  val sqlC = new CassandraSQLContext(sc)
  sqlC.setKeyspace("core")

  val df: DataFrame = sqlC.cassandraSql(
    "SELECT email, target_entity_id, target_entity_type " +
    "FROM tracking_events " +
    "LEFT JOIN customers " +
    "WHERE entity_type = 'User' AND entity_id = customer_id")
}

以下是我如何使用它:

get("/spark") {
  SparkModel.df.collect().map(r => TrackingEvent(r.getString(0), r.getString(1), r.getString(2))).toList
}

在我的Macbook Pro电脑上,Cassandra、Spark和Web应用程序在虚拟机中运行。Cassandra查询本身需要10-20毫秒。

当我第一次调用此端点时,返回结果需要70-80秒。后续查询需要约45秒左右。其日志如下所示:

12:48:50 INFO  org.apache.spark.SparkContext - Starting job: collect at V1Servlet.scala:1146
12:48:50 INFO  o.a.spark.scheduler.DAGScheduler - Got job 1 (collect at V1Servlet.scala:1146) with 1 output partitions (allowLocal=false)
12:48:50 INFO  o.a.spark.scheduler.DAGScheduler - Final stage: ResultStage 1(collect at V1Servlet.scala:1146)
12:48:50 INFO  o.a.spark.scheduler.DAGScheduler - Parents of final stage: List()
12:48:50 INFO  o.a.spark.scheduler.DAGScheduler - Missing parents: List()
12:48:50 INFO  o.a.spark.scheduler.DAGScheduler - Submitting ResultStage 1 (MapPartitionsRDD[29] at collect at V1Servlet.scala:1146), which has no missing parents
12:48:50 INFO  org.apache.spark.storage.MemoryStore - ensureFreeSpace(18696) called with curMem=26661, maxMem=825564856
12:48:50 INFO  org.apache.spark.storage.MemoryStore - Block broadcast_1 stored as values in memory (estimated size 18.3 KB, free 787.3 MB)
12:48:50 INFO  org.apache.spark.storage.MemoryStore - ensureFreeSpace(8345) called with curMem=45357, maxMem=825564856
12:48:50 INFO  org.apache.spark.storage.MemoryStore - Block broadcast_1_piece0 stored as bytes in memory (estimated size 8.1 KB, free 787.3 MB)
12:48:50 INFO  o.a.spark.storage.BlockManagerInfo - Added broadcast_1_piece0 in memory on localhost:56289 (size: 8.1 KB, free: 787.3 MB)
12:48:50 INFO  org.apache.spark.SparkContext - Created broadcast 1 from broadcast at DAGScheduler.scala:874
12:48:50 INFO  o.a.spark.scheduler.DAGScheduler - Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[29] at collect at V1Servlet.scala:1146)
12:48:50 INFO  o.a.s.scheduler.TaskSchedulerImpl - Adding task set 1.0 with 1 tasks
12:48:50 INFO  o.a.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 1.0 (TID 1, localhost, NODE_LOCAL, 59413 bytes)
12:48:50 INFO  org.apache.spark.executor.Executor - Running task 0.0 in stage 1.0 (TID 1)
12:48:50 INFO  com.datastax.driver.core.Cluster - New Cassandra host localhost/127.0.0.1:9042 added
12:48:50 INFO  c.d.s.c.cql.CassandraConnector - Connected to Cassandra cluster: Super Cluster
12:49:11 INFO  o.a.spark.storage.BlockManagerInfo - Removed broadcast_0_piece0 on localhost:56289 in memory (size: 8.0 KB, free: 787.3 MB)
12:49:35 INFO  org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 (TID 1). 6124 bytes result sent to driver
12:49:35 INFO  o.a.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 1.0 (TID 1) in 45199 ms on localhost (1/1)
12:49:35 INFO  o.a.s.scheduler.TaskSchedulerImpl - Removed TaskSet 1.0, whose tasks have all completed, from pool 
12:49:35 INFO  o.a.spark.scheduler.DAGScheduler - ResultStage 1 (collect at V1Servlet.scala:1146) finished in 45.199 s

从日志中可以看出,最长的停顿时间出现在这三行之间(21+24秒):

12:48:50 INFO  c.d.s.c.cql.CassandraConnector - Connected to Cassandra cluster: Super Cluster
12:49:11 INFO  o.a.spark.storage.BlockManagerInfo - Removed broadcast_0_piece0 on localhost:56289 in memory (size: 8.0 KB, free: 787.3 MB)
12:49:35 INFO  org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 (TID 1). 6124 bytes result sent to driver

显然,我做错了些什么。是什么呢?我该如何改进呢?

编辑:重要补充:表的大小很小(tracking_events 大约有200条条目,customers 大约有20条),因此将它们全部读入内存不应花费任何明显时间。而且这是一个本地Cassandra安装,没有集群,不涉及网络。

1个回答

4
  "SELECT email, target_entity_id, target_entity_type " +
    "FROM tracking_events " +
    "LEFT JOIN customers " +
    "WHERE entity_type = 'User' AND entity_id = customer_id")

这个查询将读取tracking_events和customers两个表中的所有数据。我会将其与仅在两个表上执行SELECT COUNT(*)的性能进行比较。如果差异显著,则可能存在问题,但我猜测这只是完全读取两个表所需的时间。

有一些用于调整读取方式的旋钮,由于默认值面向更大的数据集,因此您可能需要更改这些值。

spark.cassandra.input.split.size_in_mb  approx amount of data to be fetched into a Spark partition  64 MB
spark.cassandra.input.fetch.size_in_rows    number of CQL rows fetched per driver request   1000

我会确保生成与核心数量相同(至少)的任务,以便利用所有资源。为此,请缩小输入分割大小。
提取大小控制执行器内核一次分页的行数,因此在某些使用情况下增加它可以提高速度。

我现在无法启动我的Cassandra实例,但重要的是,这两个表很小。tracking_events有200个条目,而customers只有大约20个。由于数据加载,它不可能需要太长时间。 - Haspemulator
那么看起来Spark的重新分区将是慢的部分。除了最小化默认并行性(因为作业很小),我们没有太多可以做的来修复它。如果您可以查看UI,则可能会让您了解在重新分区事件期间创建了多少任务。对于这样一个小的作业,您可能只想要1个。因此,默认的洗牌并行度为1。spark.default.parallelism是该参数的配置参数。 - RussS
没关系,但是我想我被Spark对实时查询的适用性完全误导了。这实际上非常令人难过。我需要一些能够在最长不超过1秒钟内响应的东西。 - Haspemulator
1
Spark并不是真正用于实时查询的,它更多地是一个批量分析框架,也许你想找的是像Solr?/ElasticSearch这样的东西? - RussS
对于在Spark中设置大小的spark.cassandra.input.split.size_in_mb,请访问[链接](https://dev59.com/xo3da4cB1Zd3GeqPyFCh#31586690) - karmadip dodiya
显示剩余9条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接