当使用数据框架时,如何将限制谓词下推至Cassandra?

7

我有一个大的Cassandra表格。我想从Cassandra中仅加载50行。

以下是代码:

val ds = sparkSession.read
      .format("org.apache.spark.sql.cassandra")
      .options(Map("table" -> s"$Aggregates", "keyspace" -> s"$KeySpace"))
      .load()
      .where(col("aggregate_type") === "DAY")
      .where(col("start_time") <= "2018-03-28")
      .limit(50).collect()

以下代码使用了where方法的两个谓词,但没有使用limit方法。这样查询是否会获取全部数据(100万条记录)?如果不是,为什么加上limit(50)后代码的运行时间和未加限制的代码相差无几。

仅是一个猜测:如果实际上您的记录少于50条来满足谓词条件 - Spark必须在检查是否有更多数据之前遍历整个表格。 - dk14
@dk14 不是这样的,超过10k条记录符合谓词。 - addmeaning
“limit”默认情况下不会被翻译为CQL的limit。但是,如果您使用底层RDD进行操作,可以使用asInstanceOf[CassandraRDD],其中专门定义了适用于CQL的“limit”方法。请参见编辑后的答案。 - dk14
1个回答

6
与Spark Streaming不同,Spark本身试图尽可能快地预加载尽可能多的数据,以便能够并行处理它。因此,预加载是懒惰的,但在触发时会变得贪婪。然而,有一些针对cassandra-conector的特殊因素:
- 有效的“where”子句的自动谓词下推。 - 根据这个答案limit(...)没有被转换为CQL的LIMIT,因此其行为取决于在下载足够的数据后创建了多少获取作业。引用:
> 调用limit将允许Spark跳过从底层DataSource读取的某些部分。这将通过取消要执行的任务来限制从Cassandra读取的数据量。
可能的解决方案:
  • 通过限制numPartitions和数据交换速率(concurrent.reads和其他参数),可以部分地管理DataFrame的限制。如果您在大多数情况下都可以接受n ~ 50,则还可以限制类似where(dayIndex < 50 * factor * num_records)这样的内容。

  • 有一种方法可以通过SparkPartitionLimit设置CQL LIMIT,它直接影响每个CQL请求(了解更多)-请记住,请求是按spark-partition计算的。它在CassandraRdd扩展类中可用,因此您首先需要转换为RDD。

代码大概是这样的:

filteredDataFrame.rdd.asInstanceOf[CassandraRDD].limit(n).take(n).collect()

这将在每个CQL请求中附加LIMIT $N。与DataFrame的限制不同,如果您多次指定CassandraRDD limit.limit(10).limit(20)) - 只有最后一个会被附加。此外,我使用了n而不是n / numPartitions + 1,因为它(即使Spark和Cassandra分区是一对一的)可能会返回每个分区更少的结果。因此,我不得不添加take(n)以将<= numPartitions * n减少到n警告请仔细检查您的where是否可转换为CQL(使用explain())- 否则会在过滤之前应用LIMIT

P.S. 你也可以尝试直接使用sparkSession.sql(...)就像这里)来运行CQL,并比较结果。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接