Spark Cassandra Connector的keyBy和shuffling

5

我希望能够尽可能地避免洗牌来优化我的spark作业。

我正在使用cassandraTable来创建RDD。

由于列族的列名是动态的,因此它定义如下:

CREATE TABLE "Profile" (
  key text,
  column1 text,
  value blob,
  PRIMARY KEY (key, column1)
) WITH COMPACT STORAGE AND
  bloom_filter_fp_chance=0.010000 AND
  caching='ALL' AND
  ...

此定义将导致CassandraRow RDD元素以以下格式出现:
CassandraRow <key, column1, value>
  • key - 行键(RowKey)
  • column1 - column1的值是动态列的名称
  • value - 动态列的值

因此,如果我有一个RK='profile1',具有列名='George'和年龄='34',则生成的RDD将是:

CassandraRow<key=profile1, column1=name, value=George>
CassandraRow<key=profile1, column1=age, value=34>

然后我需要将具有相同键的元素分组在一起,以获得PairRdd:

PairRdd<String, Iterable<CassandraRow>>

重要的是要说,我需要分组的所有元素都在同一个Cassandra节点上(共享相同的行键),因此我希望连接器保持数据的本地性。
问题在于使用groupBy或groupByKey会导致数据混洗。我宁愿将它们本地分组,因为所有数据都在同一个节点上:
JavaPairRDD<String, Iterable<CassandraRow>> rdd = javaFunctions(context)
        .cassandraTable(ks, "Profile")
        .groupBy(new Function<ColumnFamilyModel, String>() {
            @Override
            public String call(ColumnFamilyModel arg0) throws Exception {
                return arg0.getKey();
            }
        })

我的问题是:

  1. 在RDD上使用keyBy会导致shuffle吗?还是它会将数据保留在本地?
  2. 有没有一种方法可以按键分组元素而不进行shuffle?我读到了关于mapPartitions的内容,但并没有完全理解它的用法。

谢谢,

Shai

1个回答

5

我认为您正在寻找 spanByKey,它是一个与cassandra-connector相关的操作,利用了cassandra提供的排序功能,允许在不产生shuffle阶段的情况下对元素进行分组。

在您的情况下,应该如下所示:

sc.cassandraTable("keyspace", "Profile")
  .keyBy(row => (row.getString("key")))
  .spanByKey

在文档中阅读更多信息:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md#grouping-rows-by-partition-key

该链接提供了有关如何按分区键分组行的详细信息。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接