Cassandra Spark 写入慢

5

我正在使用Spark Cassandra连接器和Python中的数据框架制作一个小型Spark应用,但我得到的写入速度非常慢。当我查看应用程序日志时,它显示:

17/03/28 20:04:05 INFO TableWriter: Wrote 315514 rows to movies.moviescores in 662.134 s.    

大约每秒处理474行数据。

我正在从Cassandra读取一些数据到表中,然后对它们进行一些操作(这也使得集合变得更大)。然后我将结果写回Cassandra(大约5000万行):

result.write.format("org.apache.spark.sql.cassandra").mode('append').options(table="moviescores", keyspace="movies").save()

其中结果是一个数据框。

以下是我的键空间的创建方式,如果有关系:

CREATE KEYSPACE IF NOT EXISTS movies WITH REPLICATION = { \'class\' : \'NetworkTopologyStrategy\', \'datacenter1\' : 3 };

我正在写入的表格:

CREATE TABLE IF NOT EXISTS movieScores(movieId1 int, movieId2 int, score int, PRIMARY KEY((movieId1, movieId2)));

我的设置如下: 我有5个Spark工作节点在不同的节点上运行,每个节点都在Digitalocean上以CoreOS为基础运行,具有2GB的RAM和2个内核。它们都在Docker容器中运行。 3个Cassandra节点在不同的节点上运行,每个节点都在Digitalocean上以CoreOS为基础运行,具有2GB的RAM和2个内核。它们也都在Docker容器中运行。
运行Spark的节点拥有2GB的RAM,但是它们只能使用1GB,因为这是Standalone模式下Spark的默认设置。
(default: your machine's total RAM minus 1 GB)

我不确定提出这个问题是否明智。

现在我已经阅读到应该在我的 Digital Ocean 集群中的每个节点上运行一个 Spark Worker 和一个 Cassandra 节点。但是,我不确定在只有 2GB 内存和 2 个内核的机器上同时运行一个带有 Spark 的 Docker 容器和另一个带有 Cassandra 节点的容器是否是个好主意。

为什么写入速度很慢?是否有一些参数/设置需要更改/设置才能增加写入速度?也许我的设置都是错的?我对 Spark 和 Cassandra 都很陌生。

更新:我刚刚在同一张表上进行了一个测试,没有使用 Spark,只是在我的笔记本电脑上使用 Python 的 Cassandra 连接器和一小段 Python 程序。我使用了批量插入,每次插入 1000 行,我只用了 35 秒就可以插入 100 万行,这几乎是每秒 30000 行,速度非常快。所以也许问题不是出在 Cassandra 上而是出在 Spark 上。也许在这里放置我代码的其余部分会有意义?或者也许我的设置存在问题?


这可能会对你有所帮助。虽然这是旧答案,但我会尝试更新或编写新答案,并提供最近发现的一些额外提示。 - Nachiket Kate
谢谢,但是我刚刚在同一张表上进行了一次测试,没有使用Spark,仅使用Python的Cassandra连接器和我的笔记本电脑上的一个小型Python程序。我使用批量插入每个批次1000行,只用了35秒就可以插入100万行数据,平均速度接近30000行/秒,要快得多。所以也许问题不在于Cassandra,而在于Spark。 - SilverTear
很好。为了验证Spark是否成为瓶颈,请尝试测量Spark的吞吐量。 - Nachiket Kate
非常抱歉如果这是一个愚蠢的问题,但最好的方法是什么?我正在运行一个Spark独立集群。此外,如果我进入Digitalocean上的仪表板,我可以看到每个节点的Spark节点大约有90%的CPU使用率。不确定这是否可以接受。 - SilverTear
我刚刚通过在本地使用单个工作节点运行Spark集群来进行了另一项测试,结果速度同样很慢,因此问题似乎不在云设置中。我如何才能直接以每秒30k条记录的速度写入Cassandra,但一旦使用Spark和Cassandra连接器就变得缓慢呢? - SilverTear
1个回答

0

最近我在将超过8000万条记录持久化到Cassandra时遇到了类似的问题。在我的情况下,我使用了Spark Java API。帮助我解决问题的是,在通过spark-cassandra-connector保存数据集到Cassandra之前,我对数据集应用了orderBy()方法。尝试先对你的数据集进行排序,然后再使用save()方法保存到Cassandra。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接