Spark:PySpark + Cassandra 查询性能

5

我已经在本地机器上(8核心,16GB内存)安装了Spark 2.0和Cassandra 3.0以进行测试,并按照以下方式编辑了spark-defaults.conf

spark.python.worker.memory 1g
spark.executor.cores 4
spark.executor.instances 4
spark.sql.shuffle.partitions 4

接下来我在Cassandra中导入了150万行数据:

test(
    tid int,
    cid int,
    pid int,
    ev list<double>,
    primary key (tid)
)

test.ev是一个包含数字值的列表,例如[2240,2081,159,304,1189,1125,1779,693,2187,1738,546,496,382,1761,680]

现在,在代码中,为了测试整个流程,我只需创建一个SparkSession,连接到Cassandra并进行简单的选择计数:

cassandra = spark.read.format("org.apache.spark.sql.cassandra")
df = cassandra.load(keyspace="testks",table="test")
df.select().count()

在这一点上,Spark输出了count,需要大约28秒来完成Job,分布在13个Tasks中(在Spark UI中,任务的总输入为331.6MB)。
问题:
- 这是预期的性能吗?如果不是,我错过了什么? - 理论上,DataFrame的分区数量确定了Spark将分发作业的任务数。如果我将spark.sql.shuffle.partitions设置为4,为什么会创建13个任务?(还确保通过对我的DataFrame调用rdd.getNumPartitions()来分区的数量)
更新:
我想测试的常见操作:
- 查询一个大数据集,比如从100,000 ~ N行按pid分组 - 选择ev,一个list<double> - 对每个成员执行平均值,假设现在每个列表都有相同的长度,即df.groupBy('pid').agg(avg(df['ev'][1])) 正如@zero323建议的那样,我部署了一台外部机器(2Gb RAM,4 cores,SSD),并装载了相同的数据集。 df.select().count()的结果是比我的先前测试预期更高的延迟和整体性能较差(需要约70秒才能完成Job)。
编辑:我误解了他的建议。 @zero323的意思是让Cassandra执行计数,而不是使用Spark SQL,如这里所述。
此外,我想指出,我知道为这种类型的数据设置list<double>而不是宽行的固有反模式,但我目前更关心检索大型数据集所花费的时间,而不是实际平均计算时间。

如果您想执行计数操作,则查询外部数据源的效率要高得多。总的来说,很多事情都取决于您所做的事情。关于分区,这里不使用spark.sql.shuffle.partitions。初始分区数由数据源设置,并且计数始终使用1个任务进行最终聚合。 - zero323
再次感谢@zero323。请检查我的更新。如果我理解正确,您是说分区的数量由Cassandra设置? - TMichel
好的,我认为我没有表达清楚:/ 我的意思是在执行简单操作(如计算所有行数)时,直接针对Cassandra执行查询,而不使用Spark SQL,而无需部署单独的服务器。 - zero323
关于分区数量,请查看 spark.cassandra.input.split.size_in_mb 参数以及它与总分区数的关系。此外,对于简单计数,请查看 Cassandra 支持的 RDD 上的 cassandraCount - zero323
2个回答

5

这是预期的表现吗?如果不是,我错过了什么?

看起来有点慢,但并不完全出乎意料。通常情况下,count 的表达方式为

SELECT 1 FROM table

紧接着是Spark端的求和。虽然进行了优化,但仍然相当低效,因为您需要从外部源获取N个长整数,仅需在本地对它们进行求和。

文档所述,Cassandra支持的RDD(不是Datasets)提供了经过优化的cassandraCount方法,可执行服务器端计数。

理论上,DataFrame的分区数量决定了Spark将在多少个任务中分发作业。如果我将spark.sql.shuffle.partitions设置为(...), 为什么还会创建(...)个任务呢?

这是因为此处未使用spark.sql.shuffle.partitions。此属性用于确定洗牌的分区数(当数据按某些键聚合时),而不是用于Dataset的创建或全局聚合(例如count(*),其始终使用1个分区进行最终聚合)。

如果您想控制初始分区的数量,可以查看spark.cassandra.input.split.size_in_mb,它定义了:

要获取到Spark分区的数据量。最终Spark分区的最小数量为1 + 2 * SparkContext.defaultParallelism

正如您所看到的,这里有另一个因素是spark.default.parallelism,但它并不是一个微妙的配置,在一般情况下依赖它并不是一个最优的选择。


1
那真的非常有说明性。谢谢你。 - TMichel

0

我看到这是一个很老的问题,但也许现在有人需要它。 在本地机器上运行Spark时,非常重要的是将SparkConf主节点设置为"local[*]",根据文档,这允许在您的机器上以与逻辑核心相同数量的工作线程运行Spark。

这帮助我将count()操作在本地机器上的性能提高了100%,与主节点"local"相比。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接