Spark DStream中有mapPartition
API,而Flink DataStream
API没有。是否有人能够帮助解释原因。我想要实现一个与Spark reduceByKey
类似的API在Flink上。
Spark DStream中有mapPartition
API,而Flink DataStream
API没有。是否有人能够帮助解释原因。我想要实现一个与Spark reduceByKey
类似的API在Flink上。
yourStream.keyBy("myKey") // organize stream by key "myKey"
.timeWindow(Time.seconds(5)) // build 5 sec tumbling windows
.reduce(new YourReduceFunction); // apply a reduce function on each window
DataStream文档展示了如何定义不同类型的窗口,并解释了所有可用的函数。
注意:DataStream API最近已经进行了重构。该示例假定使用最新版本(0.10-SNAPSHOT),该版本将在未来几天发布为0.10.0。
val new_number_of_partitions = 4
//below line partitions your data, you can broadcast data to all partitions
val step1stream = yourStream.rescale.setParallelism(new_number_of_partitions)
//flexibility for mapping
val step2stream = step1stream.map(new RichMapFunction[String, (String, Int)]{
// var local_val_to_different_part : Type = null
var myTaskId : Int = null
//below function is executed once for each mapper function (one mapper per partition)
override def open(config: Configuration): Unit = {
myTaskId = getRuntimeContext.getIndexOfThisSubtask
//do whatever initialization you want to do. read from data sources..
}
def map(value: String): (String, Int) = {
(value, myTasKId)
}
})
val step3stream = step2stream.keyBy(0).countWindow(new_number_of_partitions).sum(1).print
//Instead of sum(1), you can use .reduce((x,y)=>(x._1,x._2+y._2))
//.countWindow will first wait for a certain number of records for perticular key
// and then apply the function
Flink Streaming 是纯流式处理(不是批处理)。请看迭代 API。
reduce()
与Spark的reduceByKey
一样,在组上应用一对一的缩减函数。但是组的定义有些不同,因为Flink使用窗口和Spark键值对在小批量中。在Flink中没有直接等价于Spark的groupByKey
,因为这意味着需要将整个组材料化到内存中,这可能会导致OutOfMemoryErrors并杀死JVM。Flink提供了groupReduce()
来消耗流迭代器。 - Fabian HueskegroupReduce
方法永远不会返回。对于窗口流来说是“不是”(窗口具有有限长度),其中apply()
方法本质上是一个groupReduce
函数,此外还提供了一些窗口的元数据。 - Fabian Hueske