Flink流式处理:如何实现由起始和结束元素定义的窗口?

7
我有以下格式的数据,
SIP|2405463430|4115474257|8.205142580136622E12|2016年11月08日星期二16:58:58 IST|INVITE RTP|2405463430|4115474257|8.205142580136622E12|2016年11月08日星期二16:58:58 IST|0 RTP|2405463430|4115474257|8.205142580136622E12|2016年11月08日星期二16:58:58 IST|1 RTP|2405463430|4115474257|8.205142580136622E12|2016年11月08日星期二16:58:58 IST|2 RTP|2405463430|4115474257|8.205142580136622E12|2016年11月08日星期二16:58:58 IST|3 RTP|2405463430|4115474257|8.205142580136622E12|2016年11月08日星期二16:58:58 IST|4 RTP|2405463430|4115474257|8.205142580136622E12|2016年11月08日星期二16:58:58 IST|5 RTP|2405463430|4115474257|8.205142580136622E12|2016年11月08日星期二16:58:58 IST|6 RTP|2405463430|4115474257|8.205142580136622E12|2016年11月08日星期二16:58:58 IST|7 RTP|2405463430|4115474257|8.205142580136622E12|2016年11月08日星期二16:58:58 IST|8 RTP|2405463430|4115474257|8.205142580136622E12|2016年11月08日星期二16:58:58 IST|9 SIP|2405463430|4115474257|8.205142580136622E12|2016年11月08日星期二16:58:58 IST|BYE
我希望当遇到SIP-INVITE消息并在遇到SIP-BYE消息时触发事件,执行一些聚合操作。
怎么做呢?对于给定的用户,SIP-INVITE消息可能在任何时间点到达,我也可能同时有多个用户的SIP-INVITE消息。
1个回答

3
我认为你可以通过按用户键入全局窗口来解决你的用例。全局窗口根据键收集所有数据,并将触发和清除窗口的责任推给用户定义的Trigger函数。
全局窗口的定义如下:
val input: DataStream[(String, Int, String)] = ??? // (userId, value, marker)
val agg = input
  // one global window per user (handles overlapping SIP-INVITE events).
  .keyBy(_._1)
  // collect all data for each user until the trigger fires and purges the window.
  .window(GlobalWindows.create())
  // you have to implement a custom trigger which reacts on the marker.
  .trigger(new YourCustomTrigger())
  // the window function computes your aggregation.
  .apply(new YourWindowFunction())

我认为以下触发器可以实现此目的(假设一个 SIP-INVITE 事件总是开始一个会话)。Trigger.onElement() 方法应检查 SIP-BYE 字段并触发窗口评估和清除窗口,即返回 TriggerResult.FIRE_AND_PURGE。这将调用评估函数并删除窗口状态。
请注意,如果要支持乱序事件,则需要特别注意(在这种情况下,您应设置一个事件时间计时器来关闭元素的时间戳,以确保接收到时间戳之前的所有数据)。如果有应该丢弃的数据,因为它不在 SIP-INVITESIP-BYE 之间,你也需要处理。
有关详细信息,请参见 全局窗口触发器 的文档,以及 [Trigger][3] 的 JavaDocs,还有这篇博客文章

谢谢 :) 这非常有帮助! - Priya Ravichander

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接