Apache Spark如何在内存中工作?

3
当在where子句中使用非索引列查询Cassandra时,Spark-Cassandra-Connector的官方文档指出:
要过滤行,可以使用Spark提供的过滤变换。然而,这种方法会导致从Cassandra获取所有行,然后由Spark进行过滤。
我有点困惑。例如,如果我有十亿行数据,结构如下:ID、City、State和Country,其中只有ID是索引的。如果我在where子句中使用City='Chicago',那么Spark首先会下载所有十亿行数据,然后再过滤出City='Chicago'的行吗?还是它会从Cassandra读取一些数据块,运行过滤器,存储符合条件的行,然后获取更多的数据块,再次获取符合条件的行,并将其设置为待处理的行…继续这个过程。如果在任何时候,RAM或磁盘存储空间不足,则删除/卸载/丢弃未匹配条件的数据,并获取新的数据块以继续该过程?
另外,有人能告诉我一个通用公式来计算保存十亿行数据中一个BigDecimal列和3个文本列需要多少磁盘空间吗?
2个回答

4

筛选行可以在数据库中或在Spark中进行。文档建议尽可能在数据库中筛选记录,而不是在Spark中进行。这意味着:

sc.cassandraTable("test", "cars")
  .select("id", "model")
  .where("color = ?", "black")

上述语句将在数据库Cassandra中运行'color = 'black''过滤器,因此Spark不会将除黑色以外颜色的记录加载到内存中。Spark可能仅加载在'color'列中具有黑色值的少数百万记录,而不是将十亿条记录全部加载到内存中。
相反,可以在Spark中进行过滤:
sc.cassandraTable("test", "cars")
  .select("id", "model")
  .filter(car -> "black".equals(car.getColor()))

这个最新版本将数十亿条记录全部加载到Spark的内存中,然后在Spark中按颜色进行过滤。显然,这不能优先于前一个版本,该版本将需要的内存量最小化为Spark集群。因此,对于任何可以在数据库中处理的简单过滤,应使用数据库/驱动程序/查询过滤器。
关于估算内存需求,已经有其他问题提出了各种方法,请查看此链接此链接. 在Spark文档中也有很好的建议:
“您需要多少内存取决于您的应用程序。为确定您的应用程序在某个数据集大小上使用了多少内存,请将其部分加载到Spark RDD中,并使用Spark监视UI(http://:4040)的存储选项卡查看其内存大小。请注意,存储级别和序列化格式极大地影响内存使用情况-请参阅调整指南以获取有关如何减少内存使用的提示。”

@FarazDurrani 没错。Cassandra的主键/分区键相关查询规则仍然适用。我假设按该字段过滤是合法的。 - ernest_k
好的。让我们明确两件事情。首先,在数据库中进行过滤只能通过使用 .where(cql predicate)(而不是.filter)。其次,您对于 allow filtering 是正确的(即使连接器可能仍然会遇到数据库错误-文档说Cassandra引擎不允许所有谓词)。最终,您作为数据所有者将决定运行查询的有效方式以及可以调整哪些内容(索引,或者在Spark中运行过滤器)... - ernest_k
@FarazDurrani 我也在澄清自己的很多困惑 :-). 我想我也想表达这样一个观点:最好使查询确定性,并在索引列上进行数据库过滤。我个人更喜欢偏向于数据库保证的一面,而不是允许未来可能会改变的特性。 - ernest_k
感谢您的推荐。 - Faraz
你能详细解释一下 不是所有谓词都被Cassandra引擎所允许 的意思吗?请结合在where子句中查询未索引的列进行解释。 - Faraz
显示剩余5条评论

2

Spark Cassandra连接器将发出多个查询(每个Spark任务1个),并使用特定的令牌范围。因此,它将进行全表扫描,但是会逐位地并行执行。如果您在每个Cassandra节点上运行一个Spark工作程序,则连接器将选择与本地Cassandra节点匹配的令牌范围。这将限制网络上的数据洗牌。然而,进行全表扫描并不理想。


即使是全表扫描,最终会存储哪些数据到我的内存中呢?符合条件的行对吧? - Faraz

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接