我已经在本地机器上(8核心,16GB内存)安装了Spark 2.0和Cassandra 3.0以进行测试,并按照以下方式编辑了spark-defaults.conf
:
spark.python.worker.memory 1g
spark.executor.cores 4
spark.executor.instances 4
spark.sql.shuffle.partitions 4
接下来我在Cassandra中导入了150万行数据:
test(
tid int,
cid int,
pid int,
ev list<double>,
primary key (tid)
)
test.ev
是一个包含数字值的列表,例如[2240,2081,159,304,1189,1125,1779,693,2187,1738,546,496,382,1761,680]
现在,在代码中,为了测试整个流程,我只需创建一个SparkSession
,连接到Cassandra并进行简单的选择计数:
cassandra = spark.read.format("org.apache.spark.sql.cassandra")
df = cassandra.load(keyspace="testks",table="test")
df.select().count()
在这一点上,Spark输出了
count
,需要大约28秒来完成Job
,分布在13个Tasks
中(在Spark UI
中,任务的总输入为331.6MB)。问题:
- 这是预期的性能吗?如果不是,我错过了什么? - 理论上,DataFrame的分区数量确定了Spark将分发作业的任务数。如果我将
spark.sql.shuffle.partitions
设置为4,为什么会创建13个任务?(还确保通过对我的DataFrame调用rdd.getNumPartitions()
来分区的数量)更新:
我想测试的常见操作:
- 查询一个大数据集,比如从100,000 ~ N行按
pid
分组
- 选择ev
,一个list<double>
- 对每个成员执行平均值,假设现在每个列表都有相同的长度,即df.groupBy('pid').agg(avg(df['ev'][1]))
正如@zero323建议的那样,我部署了一台外部机器(2Gb RAM,4 cores,SSD),并装载了相同的数据集。 df.select().count()
的结果是比我的先前测试预期更高的延迟和整体性能较差(需要约70秒才能完成Job
)。编辑:我误解了他的建议。 @zero323的意思是让Cassandra执行计数,而不是使用Spark SQL,如这里所述。
此外,我想指出,我知道为这种类型的数据设置
list<double>
而不是宽行的固有反模式,但我目前更关心检索大型数据集所花费的时间,而不是实际平均计算时间。
spark.sql.shuffle.partitions
。初始分区数由数据源设置,并且计数始终使用1个任务进行最终聚合。 - zero323spark.cassandra.input.split.size_in_mb
参数以及它与总分区数的关系。此外,对于简单计数,请查看 Cassandra 支持的 RDD 上的cassandraCount
。 - zero323