Apache Flink DataStream API 没有 mapPartition 转换。

7

Spark DStream中有mapPartition API,而Flink DataStream API没有。是否有人能够帮助解释原因。我想要实现一个与Spark reduceByKey类似的API在Flink上。

2个回答

8
Flink的流处理模型与以小批量为中心的Spark Streaming有很大不同。在Spark Streaming中,每个小批量都像常规批处理程序一样在有限的数据集上执行,而Flink DataStream程序则会持续处理记录。
在Flink的DataSet API中,MapPartitionFunction有两个参数:输入的迭代器和函数结果的收集器。在Flink DataStream程序中,MapPartitionFunction永远不会从第一个函数调用返回,因为迭代器将遍历无尽的记录流。但是,Flink的内部流处理模型要求用户函数返回以检查点功能状态。因此,DataStream API不提供mapPartition转换。
为了实现类似于Spark Streaming的reduceByKey功能,您需要在流上定义一个键控窗口。窗口离散化流,类似于小批处理,但窗口提供了更多的灵活性。由于窗口大小是有限的,因此可以调用窗口的reduce方法。
代码示例如下:
yourStream.keyBy("myKey") // organize stream by key "myKey"
          .timeWindow(Time.seconds(5)) // build 5 sec tumbling windows
          .reduce(new YourReduceFunction); // apply a reduce function on each window

DataStream文档展示了如何定义不同类型的窗口,并解释了所有可用的函数。

注意:DataStream API最近已经进行了重构。该示例假定使用最新版本(0.10-SNAPSHOT),该版本将在未来几天发布为0.10.0。


你提供的 'reduceByKey' 解决方案似乎与 Spark 中的 'GroupByKey' 相似,而不是 'reduceByKey'。https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html - Jun
不,Flink的reduce()与Spark的reduceByKey一样,在组上应用一对一的缩减函数。但是组的定义有些不同,因为Flink使用窗口和Spark键值对在小批量中。在Flink中没有直接等价于Spark的groupByKey,因为这意味着需要将整个组材料化到内存中,这可能会导致OutOfMemoryErrors并杀死JVM。Flink提供了groupReduce()来消耗流迭代器。 - Fabian Hueske
我看到Flink的reduce()方法应用了可组合性。是不是出于类似的原因,Flink DataStream没有像mapPartition那样的reduceGroup API呢? - Jun
是的和不是的 :-). 对于非窗口流来说是“是”,因为它们具有无限长度,而groupReduce方法永远不会返回。对于窗口流来说是“不是”(窗口具有有限长度),其中apply()方法本质上是一个groupReduce函数,此外还提供了一些窗口的元数据。 - Fabian Hueske
谢谢,看来我找不到你提到的这种apply方法。 - Jun

0
假设您的输入流是单个分区数据(例如字符串)。
val new_number_of_partitions = 4

//below line partitions your data, you can broadcast data to all partitions
val step1stream = yourStream.rescale.setParallelism(new_number_of_partitions)

//flexibility for mapping
val step2stream = step1stream.map(new RichMapFunction[String, (String, Int)]{
  // var local_val_to_different_part : Type = null
  var myTaskId : Int = null

  //below function is executed once for each mapper function (one mapper per partition)
  override def open(config: Configuration): Unit = {
    myTaskId = getRuntimeContext.getIndexOfThisSubtask
    //do whatever initialization you want to do. read from data sources..
  }

  def map(value: String): (String, Int) = {
    (value, myTasKId)
  }
})

val step3stream = step2stream.keyBy(0).countWindow(new_number_of_partitions).sum(1).print
//Instead of sum(1), you can use .reduce((x,y)=>(x._1,x._2+y._2))
//.countWindow will first wait for a certain number of records for perticular key
// and then apply the function

Flink Streaming 是纯流式处理(不是批处理)。请看迭代 API。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接