我希望能够尽可能地避免洗牌来优化我的spark作业。
我正在使用cassandraTable来创建RDD。
由于列族的列名是动态的,因此它定义如下:
CREATE TABLE "Profile" (
key text,
column1 text,
value blob,
PRIMARY KEY (key, column1)
) WITH COMPACT STORAGE AND
bloom_filter_fp_chance=0.010000 AND
caching='ALL' AND
...
此定义将导致CassandraRow RDD元素以以下格式出现:
CassandraRow <key, column1, value>
- key - 行键(RowKey)
- column1 - column1的值是动态列的名称
- value - 动态列的值
因此,如果我有一个RK='profile1',具有列名='George'和年龄='34',则生成的RDD将是:
CassandraRow<key=profile1, column1=name, value=George>
CassandraRow<key=profile1, column1=age, value=34>
然后我需要将具有相同键的元素分组在一起,以获得PairRdd:
PairRdd<String, Iterable<CassandraRow>>
重要的是要说,我需要分组的所有元素都在同一个Cassandra节点上(共享相同的行键),因此我希望连接器保持数据的本地性。
问题在于使用groupBy或groupByKey会导致数据混洗。我宁愿将它们本地分组,因为所有数据都在同一个节点上:
JavaPairRDD<String, Iterable<CassandraRow>> rdd = javaFunctions(context)
.cassandraTable(ks, "Profile")
.groupBy(new Function<ColumnFamilyModel, String>() {
@Override
public String call(ColumnFamilyModel arg0) throws Exception {
return arg0.getKey();
}
})
我的问题是:
- 在RDD上使用keyBy会导致shuffle吗?还是它会将数据保留在本地?
- 有没有一种方法可以按键分组元素而不进行shuffle?我读到了关于mapPartitions的内容,但并没有完全理解它的用法。
谢谢,
Shai