Flink流式处理中事件时间窗口排序

10

我遇到一些困惑,不太理解事件时间窗口的语义。以下程序生成了一些具有时间戳的元组,这些时间戳被用作事件时间,并进行简单的窗口聚合。我期望输出与输入顺序相同,但实际上输出的顺序不同。为什么输出与事件时间的顺序不一致?

import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._

object WindowExample extends App {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.enableTimestamps()
    env.setParallelism(1)

    val start = 1449597577379L
    val tuples = (1 to 10).map(t => (start + t * 1000, t))

    env.fromCollection(tuples)
      .assignAscendingTimestamps(_._1)
      .timeWindowAll(Time.of(1, TimeUnit.SECONDS))
      .sum(1)
      .print()

    env.execute()
}

输入:

 (1449597578379,1)
 (1449597579379,2)
 (1449597580379,3)
 (1449597581379,4)
 (1449597582379,5)
 (1449597583379,6)
 (1449597584379,7)
 (1449597585379,8)
 (1449597586379,9)
 (1449597587379,10)

结果:

[info] (1449597579379,2)
[info] (1449597581379,4)
[info] (1449597583379,6)
[info] (1449597585379,8)
[info] (1449597587379,10)
[info] (1449597578379,1)
[info] (1449597580379,3)
[info] (1449597582379,5)
[info] (1449597584379,7)
[info] (1449597586379,9)
1个回答

12
这种行为的原因在于,在Flink中元素的顺序(相对于时间戳)不被考虑。只有水印的正确性及其与元素时间戳的关系对于考虑时间的操作是重要的,因为水印通常会触发基于时间的操作中的计算。
在您的示例中,窗口操作符将源中的所有元素存储在内部窗口缓冲区中。然后,源会发出一个水印,表示未来不会到达任何时间戳较小的元素。这反过来告诉窗口操作符处理所有结束时间戳低于水印的窗口(对于所有窗口都是正确的)。因此,它会发出所有窗口(具有任意顺序),然后再发出一个水印。从此处向下游操作接收元素并能够在接收到水印后进行处理。
默认情况下,源发出水印的间隔是200毫秒。由于您的源发出的元素数量很少,因此所有元素都会在第一个水印之前发出。在实际的使用案例中,水印发射间隔远小于窗口大小,则可以得到按时间戳顺序发出窗口的预期行为。例如,如果您有1小时的窗口和每500毫秒的水印。

1
你能否提供或指出一个下游操作的例子,该操作可以在接收到水印后根据事件时间重新排序元素?谢谢! - Maksim Kolchin
1
@MaximKolchin 这种重新排序的情况在CEP库中经常发生。你可以在这里看一下:https://github.com/apache/flink/blob/master/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java - Dawid Wysakowicz

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接