在Apache Spark中将批处理RDD和流处理RDD的结果合并

9

背景: 我正在使用Apache Spark来对日志中不同事件类型的运行次数进行聚合。这些日志存储在Cassandra和Kafka中,前者用于历史分析目的,后者用于实时分析目的。每个日志都有一个日期和事件类型。为了简单起见,假设我想跟踪每天单一类型的日志数量。

我们有两个RDD,一个是从Cassandra获取的批处理数据的RDD,另一个是从Kafka获取的流处理数据的RDD。 伪代码如下:

CassandraJavaRDD<CassandraRow> cassandraRowsRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable(KEYSPACE, TABLE).select("date", "type");

JavaPairRDD<String, Integer> batchRDD = cassandraRowsRDD.mapToPair(new PairFunction<CassandraRow, String, Integer>() {
    @Override
    public Tuple2<String, Integer> call(CassandraRow row) {
        return new Tuple2<String, Integer>(row.getString("date"), 1);
    }
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer count1, Integer count2) {
        return count1 + count2;
    }
});

save(batchRDD) // Assume this saves the batch RDD somewhere

...

// Assume we read a chunk of logs from the Kafka stream every x seconds.
JavaPairReceiverInputDStream<String, String> kafkaStream =  KafkaUtils.createStream(...);
JavaPairDStream<String, Integer> streamRDD = kafkaStream.flatMapToPair(new PairFlatMapFunction<Tuple2<String, String>, String, Integer>() {
    @Override
    public Iterator<Tuple2<String, Integer> call(Tuple2<String, String> data) {
        String jsonString = data._2;
        JSON jsonObj = JSON.parse(jsonString);
        Date eventDate = ... // get date from json object
        // Assume startTime is broadcast variable that is set to the time when the job started.
        if (eventDate.after(startTime.value())) { 
            ArrayList<Tuple2<String, Integer>> pairs = new ArrayList<Tuple2<String, Integer>>();
            pairs.add(new Tuple2<String, Integer>(jsonObj.get("date"), 1));
            return pairs;
        } else {
            return new ArrayList<Tuple2<String, Integer>>(0); // Return empty list when we ignore some logs
        }
    }
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer count1, Integer count2) {
        return count1 + count2;
    }
}).updateStateByKey(new Function2<List<Integer>, Optional<List<Integer>>, Optional<Integer>>() {
    @Override
    public Optional<Integer> call(List<Integer> counts, Optional<Integer> state) {
        Integer previousValue = state.or(0l);
        Integer currentValue = ... // Sum of counts
        return Optional.of(previousValue + currentValue);
    }
});
save(streamRDD); // Assume this saves the stream RDD somewhere

sc.start();
sc.awaitTermination();

问题: 如何将streamRDD的结果与batchRDD合并? 假设batchRDD具有以下数据,并且此作业在2014-10-16运行:

("2014-10-15", 1000000)
("2014-10-16", 2000000)

由于Cassandra查询仅包括批量查询开始时间之前的所有数据,因此在查询完成时必须从Kafka读取,仅考虑作业开始时间之后的日志。我们假设查询需要很长时间。这意味着我需要将历史结果与流式结果结合起来。

举个例子:

    |------------------------|-------------|--------------|--------->
tBatchStart             tStreamStart   streamBatch1  streamBatch2

假设在第一个流批次中,我们获得了以下数据:
("2014-10-19", 1000)

接下来,我想将批处理RDD与这个流式RDD组合在一起,以便流式RDD现在具有以下值:

("2014-10-19", 2001000)

假设在第二个流批处理中,我们获取了以下数据:
("2014-10-19", 4000)

那么流式RDD应该更新为具有以下值:

("2014-10-19", 2005000)

等等...

可以使用 streamRDD.transformToPair(...) 将 streamRDD 数据与 batchRDD 数据使用 join 结合在一起,但如果我们对每个 stream chunk 都这样做,那么我们将为每个 stream chunk 添加来自 batchRDD 的计数,从而使状态值“重复计算”,而它只应该被添加到第一个 stream chunk。

2个回答

5
为了解决这个问题,我会将基本的RDD与聚合StateDStream的结果合并起来,以保持流数据的总数。这有效地为每个流间隔报告的数据提供了一个基准,而不会重复计算该基准x次。
我使用了示例WordCount尝试了这个想法,它是可行的。在REPL上运行以下命令可以进行实时演示:
(在单独的shell上使用nc -lk 9876来提供输入到socketTextStream
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel

@transient val defaults = List("magic" -> 2, "face" -> 5, "dust" -> 7 )
val defaultRdd = sc.parallelize(defaults)

@transient val ssc = new StreamingContext(sc, Seconds(10))
ssc.checkpoint("/tmp/spark")

val lines = ssc.socketTextStream("localhost", 9876, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCount = words.map(x => (x, 1)).reduceByKey(_ + _)
val historicCount = wordCount.updateStateByKey[Int]{(newValues: Seq[Int], runningCount: Option[Int]) => 
    Some(newValues.sum + runningCount.getOrElse(0))
}
val runningTotal = historicCount.transform{ rdd => rdd.union(defaultRdd)}.reduceByKey( _+_ )

wordCount.print()
historicCount.print()
runningTotal.print()
ssc.start()

2
感谢。我只想补充一点,即在转换中,我使用了 rdd.leftOuterJoin(defaultRdd) 而不是 rdd.union(defaultRdd),这样 runningTotal 就不会包括未更改的键值对。然后,我只需要保存值已更改的键值对。 - Bobby

1
你可以尝试使用updateStateByKey
def main(args: Array[String]) {

    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
        val currentCount = values.foldLeft(0)(_ + _)
        val previousCount = state.getOrElse(0)
        Some(currentCount + previousCount)
    }

    // stream
    val ssc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(1))
    ssc.checkpoint(".")
    val lines = ssc.socketTextStream("127.0.0.1", 9999)
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val stateWordCounts = pairs.updateStateByKey[Int](updateFunc)
    stateWordCounts.print()
    ssc.start()
    ssc.awaitTermination()
}

1
我已经在使用它了。问题是,如果可选状态值为null,则必须默认为一个值。理想情况下,这个值应该是从批处理RDD计算出来的值。问题是,updateStateByKey()不会传递键,因此我无法查找从批处理RDD计算出的值。 - Bobby

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接