Apache Flink: 多窗口聚合和延迟数据

4
我们计划在大规模的IOT设置中使用Apache Flink。客户将向我们发送某种结构化传感器数据(例如sensor_id、sensor_type、sensor_value、timestamp)。我们无法控制每个客户何时发送此数据,很可能是实时的,但我们没有保证。我们将所有事件存储在RabbitMQ/Kafka中。更新:可以假设来自传感器的事件按顺序到达。
在开始实现可能的流水线之前,我们对以下挑战的解决方案感兴趣:
1.多窗口聚合
我们将所有原始传感器数据存储在Cassandra中。此外,我们希望按sensor_id对传感器数据进行多个时间窗口的聚合(例如15秒、1分钟、15分钟、1小时、1天)。使用Flink流式处理高效地实现所需输出的推荐方法是什么?
2.延迟严重的数据
如前所述,我们无法控制数据何时被发送。例如,客户可能遇到网络故障,因此数据可能会延迟到达。如何处理推荐方式?如果我们只能通过sensor_id保证好的水印(因为每个客户都有其自己的时间/问题/故障),我们该如何使用水印?我们可以添加一些允许的延迟时间(如6 - 12小时左右),这是否可行?flinks在内存窗口存储中如何处理超过允许延迟时间的数据?在超过允许的延迟时间后会发生什么?我们应该将真正延迟的数据存储到另一个kafka主题中并持续进行批处理吗?最后,一些客户会上传包含其收集的传感器数据的CSV文件。这是否也适用于批处理方式?
3.未来的数据
如果某个客户发送了远期数据(由于错误配置的传感器,因为我们无法控制它),则流会发生什么变化?
我们很想听听您的建议。谢谢。
1个回答

7

这里有几个问题,我会逐一回答:

  1. 多窗口聚合

您可以构建级联窗口操作符的数据流,并在每个窗口之后分离(发射或进行进一步处理)结果。

Input -> window(15 secs) -> window(1 min) -> window(15 min) -> ...
                        \-> out_1        \-> out_2         \-> out_3
  1. 非常延迟的数据

问题似乎在于某些数据可能会“非常”延迟到达,而不是仅有按键排序的数据。目前无法使用按键水印。因此,“逻辑时钟”对于所有事件都是相同的。Flink 的允许迟到定义了状态保留多长时间以等待迟到的数据。如果数据迟到(超过水印),但在允许的延迟时间内到达,则相应的状态仍然可用,并计算更新。如果事件太迟(晚于允许的延迟时间),则状态被丢弃,并且事件也被丢弃。高允许延迟意味着需要保留更多的状态。但是,这个问题原则上可以通过扩展规模来解决。进入专用 Kafka 话题的延迟数据的处理也可以使用 Flink 进行。周期性文件也可以通过流处理器更好地进行连续处理。批处理解决方案需要处理跨越文件的数据(外部化状态处理)、作业调度、错误处理等。

  1. 未来的数据

使用 Flink 的水印机制,操作者始终转发其最高水印(时间不能倒退),但将其水印计算为从所有输入通道接收到的最小水印。因此,除非所有通道上都有未来的数据,否则您应该没问题。未来的数据将作为状态放置,并在时间到达“未来”时计算。这意味着,您不会丢失数据,但您可能需要等待相当长的时间才能处理它。


根据您的描述,我会考虑在键控流上实现聚合作为具有状态的 FlatMap 操作符。鉴于传感器的每个数据按顺序到达,您可以在 FlatMap(或一系列 FlatMap,每个时间间隔一个)中进行必要的聚合。

这里的一个挑战是在看到晚于聚合间隔的事件之前,你不知道何时关闭聚合。在具有全局有效水印的流中,即使没有接收到特定键的事件,时间也可以前进(并关闭窗口)。

另一个问题是在删除传感器的情况下清除状态,这不会被自动检测。也许可以使用特殊的标记记录来触发状态清理。


非常感谢您的全面回答。 - kaelumania
Q1:使用有状态的flatMap,我是否需要winow函数?我如何检查具有键x的元素是否到达了稍早于事件时间的窗口结束时间的窗口。然后我可以清除该窗口。 - kaelumania
Q2:Flink是否有可能将非常晚的数据放入专用主题?我们每个传感器都有定期文件,但所有数据都将放入同一个Kafka主题(sensor_data_csv)。因此,由于不同的文件上传跨越不同的时间跨度,所以水印会妨碍我们。 - kaelumania
Q3:我们有一个单一的流用于所有传感器,因此所有输入的最小值将是该单一流的最大值,对吗?我可以启动一个计时器,并在每个事件之后使其失效,当长时间没有数据到达时,它会在很长一段时间后触发以清除非常旧的窗口吗? - kaelumania
我认为SO评论不是问答的正确格式。我建议去用户邮件列表寻求进一步的问题解答(包括历史记录以供参考)。Q1:如果您使用FlatMapFunction,则必须自己进行检查。keyBy()和键控状态将有所帮助。Q2:目前还没有,这是一个正在讨论中的功能。Q3:如果单个流在并行处理,则使用所有分区的最小WM。支持用户函数中的计时器是另一个即将推出的功能。不确定实际状态如何。 - Fabian Hueske

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接