背景: 我正在使用Apache Spark来对日志中不同事件类型的运行次数进行聚合。这些日志存储在Cassandra和Kafka中,前者用于历史分析目的,后者用于实时分析目的。每个日志都有一个日期和事件类型。为了简单起见,假设我想跟踪每天单一类型的日志数量。
我们有两个RDD,一个是从Cassandra获取的批处理数据的RDD,另一个是从Kafka获取的流处理数据的RDD。 伪代码如下:
CassandraJavaRDD<CassandraRow> cassandraRowsRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable(KEYSPACE, TABLE).select("date", "type");
JavaPairRDD<String, Integer> batchRDD = cassandraRowsRDD.mapToPair(new PairFunction<CassandraRow, String, Integer>() {
@Override
public Tuple2<String, Integer> call(CassandraRow row) {
return new Tuple2<String, Integer>(row.getString("date"), 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer count1, Integer count2) {
return count1 + count2;
}
});
save(batchRDD) // Assume this saves the batch RDD somewhere
...
// Assume we read a chunk of logs from the Kafka stream every x seconds.
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(...);
JavaPairDStream<String, Integer> streamRDD = kafkaStream.flatMapToPair(new PairFlatMapFunction<Tuple2<String, String>, String, Integer>() {
@Override
public Iterator<Tuple2<String, Integer> call(Tuple2<String, String> data) {
String jsonString = data._2;
JSON jsonObj = JSON.parse(jsonString);
Date eventDate = ... // get date from json object
// Assume startTime is broadcast variable that is set to the time when the job started.
if (eventDate.after(startTime.value())) {
ArrayList<Tuple2<String, Integer>> pairs = new ArrayList<Tuple2<String, Integer>>();
pairs.add(new Tuple2<String, Integer>(jsonObj.get("date"), 1));
return pairs;
} else {
return new ArrayList<Tuple2<String, Integer>>(0); // Return empty list when we ignore some logs
}
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer count1, Integer count2) {
return count1 + count2;
}
}).updateStateByKey(new Function2<List<Integer>, Optional<List<Integer>>, Optional<Integer>>() {
@Override
public Optional<Integer> call(List<Integer> counts, Optional<Integer> state) {
Integer previousValue = state.or(0l);
Integer currentValue = ... // Sum of counts
return Optional.of(previousValue + currentValue);
}
});
save(streamRDD); // Assume this saves the stream RDD somewhere
sc.start();
sc.awaitTermination();
问题:
如何将streamRDD的结果与batchRDD合并?
假设batchRDD
具有以下数据,并且此作业在2014-10-16运行:
("2014-10-15", 1000000)
("2014-10-16", 2000000)
由于Cassandra查询仅包括批量查询开始时间之前的所有数据,因此在查询完成时必须从Kafka读取,仅考虑作业开始时间之后的日志。我们假设查询需要很长时间。这意味着我需要将历史结果与流式结果结合起来。
举个例子:
|------------------------|-------------|--------------|--------->
tBatchStart tStreamStart streamBatch1 streamBatch2
假设在第一个流批次中,我们获得了以下数据:
("2014-10-19", 1000)
接下来,我想将批处理RDD与这个流式RDD组合在一起,以便流式RDD现在具有以下值:
("2014-10-19", 2001000)
假设在第二个流批处理中,我们获取了以下数据:
("2014-10-19", 4000)
那么流式RDD应该更新为具有以下值:
("2014-10-19", 2005000)
等等...
可以使用 streamRDD.transformToPair(...)
将 streamRDD 数据与 batchRDD 数据使用 join
结合在一起,但如果我们对每个 stream chunk 都这样做,那么我们将为每个 stream chunk 添加来自 batchRDD 的计数,从而使状态值“重复计算”,而它只应该被添加到第一个 stream chunk。
rdd.leftOuterJoin(defaultRdd)
而不是rdd.union(defaultRdd)
,这样runningTotal
就不会包括未更改的键值对。然后,我只需要保存值已更改的键值对。 - Bobby