我有一个Spark Streaming应用程序,每秒接收多个JSON消息,每个消息都有一个ID来标识它们的来源。
使用此ID作为键,我能够执行MapPartitionsToPair,从而创建一个JavaPairDStream,其中包含一个RDD的键值对,每个分区一个键值对(例如,如果我收到5个JSON消息,那么我将得到一个带有5个分区的RDD,每个分区的键是消息的ID,值是JSON消息本身)。
现在,我想将所有具有相同键的值分组到同一个分区中。因此,例如,如果我有3个键为'a'的分区和2个键为'b'的分区,我想创建一个新的RDD,其中有2个分区而不是5个分区,每个分区包含一个键的所有值,一个分区为'a',另一个为'b'。
如何实现这一点? 以下是我的代码:
使用此ID作为键,我能够执行MapPartitionsToPair,从而创建一个JavaPairDStream,其中包含一个RDD的键值对,每个分区一个键值对(例如,如果我收到5个JSON消息,那么我将得到一个带有5个分区的RDD,每个分区的键是消息的ID,值是JSON消息本身)。
现在,我想将所有具有相同键的值分组到同一个分区中。因此,例如,如果我有3个键为'a'的分区和2个键为'b'的分区,我想创建一个新的RDD,其中有2个分区而不是5个分区,每个分区包含一个键的所有值,一个分区为'a',另一个为'b'。
如何实现这一点? 以下是我的代码:
JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
StorageLevels.MEMORY_AND_DISK_SER);
JavaPairDStream<String,String> streamGiveKey= streamData2.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, String>() {
@Override
public Iterable<Tuple2<String, String>> call(Iterator<String> stringIterator) throws Exception {
ArrayList<Tuple2<String,String>>a= new ArrayList<Tuple2<String, String>>();
while (stringIterator.hasNext()){
String c=stringIterator.next();
if(c==null){
return null;
}
JsonMessage retMap = new Gson().fromJson(c,JsonMessage.class);
String key= retMap.getSid();
Tuple2<String,String> b= new Tuple2<String,String>(key,c);
a.add(b);
System.out.print(b._1+"_"+b._2);
// }
//break;
}
return a;
}
});
//我创建了一个JavaPairDStream,其中每个分区都包含一个键/值对。
我尝试使用groupByKey()
,但无论消息数量如何,我始终得到2个分区的数量。
我该怎么办? 非常感谢。