如何使用Apache Flink处理乱序事件?

4
为了测试流处理和Flink,我给自己提出了一个看似简单的问题。我的数据流包括粒子的xy坐标以及记录位置的时间t。我的目标是为这些数据注释特定粒子的速度。因此,流可能看起来像这样。
<timestamp:Long> <particle_id:String> <x:Double> <y:Double>

1612103771212 p1 0.0 0.0
1612103771212 p2 0.0 0.0
1612103771213 p1 0.1 0.1
1612103771213 p2 -0.1 -0.1
1612103771214 p1 0.1 0.2
1612103771214 p2 -0.1 -0.2
1612103771215 p1 0.2 0.2
1612103771215 p2 -0.2 -0.2

现在不能保证事件的顺序,例如1612103771213 p2 -0.1 -0.1可能会在1612103771212 p2 0.0 0.0之前达到10ms。

为简单起见,可以假设任何延迟数据都将在早期数据的100ms内到达。

我承认我对流处理和Flink是新手,所以这可能是一个愚蠢的问题,并且有明显的答案,但我目前还想不出如何实现我的目标。

编辑

按照David的回答,我尝试使用Flink Table API对Datastream进行排序,使用nc-lk 9999进行文本套接字流。问题在于,直到关闭文本套接字流之前,没有任何内容会打印到控制台。以下是我编写的Scala代码-


package processor

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, FieldExpression, WithOperations}
import org.apache.flink.util.Collector

import java.time.Duration


object AnnotateJob {

  val OUT_OF_ORDER_NESS = 100

  def main(args: Array[String]) {
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val bSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

    val tableEnv = StreamTableEnvironment.create(env, bSettings)

    env.setParallelism(1)

    // Obtain the input data by connecting to the socket. Here you want to connect to the local 9999 port.
    val text = env.socketTextStream("localhost", 9999)
    val objStream = text
      .filter( _.nonEmpty )
      .map(new ParticleMapFunction)

    val posStream = objStream
      .assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[ParticlePos](Duration.ofMillis(OUT_OF_ORDER_NESS))
          .withTimestampAssigner(new SerializableTimestampAssigner[ParticlePos] {
            override def extractTimestamp(t: ParticlePos, l: Long): Long = t.t
          })
      )

    val tablePos = tableEnv.fromDataStream(posStream, $"t".rowtime() as "et", $"t", $"name", $"x", $"y")
    tableEnv.createTemporaryView("pos", tablePos)
    val sorted = tableEnv.sqlQuery("SELECT t, name, x, y FROM pos ORDER BY et ASC")

    val sortedPosStream = tableEnv.toAppendStream[ParticlePos](sorted)

    // sortedPosStream.keyBy(pos => pos.name).process(new ValAnnotator)

    sortedPosStream.print()

    // execute program
    env.execute()
  }

  case class ParticlePos(t : Long, name : String, x : Double, y : Double) extends Serializable
  case class ParticlePosVal(t : Long, name : String, x : Double, y : Double,
                            var vx : Double = 0.0, var vy : Double = 0.0) extends Serializable

  class ParticleMapFunction extends MapFunction[String, ParticlePos] {
    override def map(t: String): ParticlePos = {
      val parts = t.split("\\W+")
      ParticlePos(parts(0).toLong, parts(1), parts(2).toDouble, parts(3).toDouble)
    }
  }

}

2个回答

3
一般来说,水印和事件时间计时器的结合是解决事件流乱序问题的方案。官方Flink培训中涵盖事件时间和水印的部分解释了其工作原理。
在更高层次上,有时使用类似于Flink的CEP库或Flink SQL更容易,因为它们使按时间对流进行排序变得非常容易,从而消除了所有的乱序。例如,参见如何使用Flink SQL按事件时间对流进行排序,其中介绍了一个使用Flink SQL按事件时间对流进行排序的Flink DataStream程序示例。
在您的情况下,一个相当简单的MATCH_RECOGNIZE查询就能实现您要找的功能。可能看起来像这样:
SELECT *
    FROM event
    MATCH_RECOGNIZE (
        PARTITION BY particleId
        ORDER BY ts
        MEASURES 
            b.ts, 
            b.particleId, 
            velocity(a, b)
        AFTER MATCH SKIP TO NEXT ROW
        PATTERN (a b)
        DEFINE
            a AS TRUE,
            b AS TRUE
    )

其中,velocity(a, b)是一个用户自定义的函数,用于计算同一粒子的两个连续事件(a和b)之间的速度。


嘿,David,我按照你提供的链接中的帖子进行了操作,并尝试使用套接字实现我的数据流解决方案。问题是在关闭套接字连接之前,控制台上没有任何输出。我已经编辑了我的问题并附上了我正在使用的代码,请帮忙看看,非常感谢。 - Optimus
1
这意味着作业看到的第一个水印是由套接字关闭生成的,它会注入一个时间为MAX_WATERMARK的水印。可能有两件事情阻止了更早的水印被创建:要么没有具有足够大时间戳的事件,要么作业没有运行足够长的时间(默认情况下,有序性边界策略每200毫秒调用一次以获取新的水印)。 - David Anderson
1
谢谢David,我没有任何具有足够大时间戳的事件,现在我可以看到输出了。 - Optimus

2
在Flink中,实现这个功能的一种方法可能是使用KeyedProcessFunction,即一个可以处理以下功能的函数:
  • 处理流中的每个事件
  • 维护一些状态
  • 基于事件时间触发一些逻辑
因此,它的实现大致如下:
  • 您需要了解有关数据的某种“最大乱序度”的信息。根据您的描述,例如假设为100毫秒,这意味着在处理时间戳1612103771212的数据时,您决定考虑已接收到直到1612103771112的所有数据。
  • 第一步是对流进行keyBy()操作,按粒子ID分组。这意味着您Flink应用程序中的下一个运算符逻辑现在可以表示为仅涉及一个粒子的一系列事件,并且每个粒子以这种方式并行处理。
大致如下:
yourStream.keyBy(...lookup p1 or p2 here...).process(new YourProcessFunction())
  • YourProcessFunctionProcessFunction初始化期间(即在open方法中),请初始化一个ListState,您可以安全地将内容存储在其中。
  • 在处理流中的元素时,在processElement方法中,只需将其添加到listState中,并注册100ms的计时器触发器。
  • onTimer()方法触发时,比如在时间t时,查看listState中所有具有时间<t-100的元素,如果您至少有两个这样的元素,请对它们进行排序、从状态中删除它们,应用您描述的速度计算和注释逻辑,并向下游发出结果。

官方Flink培训示例中,您会发现这种逻辑被用于出租车行程的持续时间,这与您的用例有很多相似之处。还可以查看该存储库的各个Readme.md文件以获取更多详细信息。


已经使用keyBy(particleID)了,我会研究flink培训示例,看看如何将其应用到我的案例中。 - Optimus

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接