Apache Flink - 如果 x 分钟没有收到数据,则发送事件

11

我该如何使用Flink的DataStream API实现一个操作符,当从数据流中未接收到一定时间的数据时,发送一个事件?

2个回答

17

这样的运算符可以使用ProcessFunction来实现。

DataStream<Long> input = env.fromElements(1L, 2L, 3L, 4L);

input
  // use keyBy to have keyed state. 
  // NullByteKeySelector will move all data to one task. You can also use other keys
  .keyBy(new NullByteKeySelector())
  // use process function with 60 seconds timeout
  .process(new TimeOutFunction(60 * 1000));

TimeOutFunction 的定义如下。在此示例中,它使用处理时间。

public static class TimeOutFunction extends ProcessFunction<Long, Boolean> {

  // delay after which an alert flag is thrown
  private final long timeOut;
  // state to remember the last timer set
  private transient ValueState<Long> lastTimer;

  public TimeOutFunction(long timeOut) {
    this.timeOut = timeOut;
  }

  @Override
  public void open(Configuration conf) {
    // setup timer state
    ValueStateDescriptor<Long> lastTimerDesc = 
      new ValueStateDescriptor<Long>("lastTimer", Long.class);
    lastTimer = getRuntimeContext().getState(lastTimerDesc);
  }

  @Override
  public void processElement(Long value, Context ctx, Collector<Boolean> out) throws Exception {
    // get current time and compute timeout time
    long currentTime = ctx.timerService().currentProcessingTime();
    long timeoutTime = currentTime + timeOut;
    // register timer for timeout time
    ctx.timerService().registerProcessingTimeTimer(timeoutTime);
    // remember timeout time
    lastTimer.update(timeoutTime);
  }

  @Override
  public void onTimer(long timestamp, OnTimerContext ctx, Collector<Boolean> out) throws Exception {
    // check if this was the last timer we registered
    if (timestamp == lastTimer.value()) {
      // it was, so no data was received afterwards.
      // fire an alert.
      out.collect(true);
    }
  }
}

3
一个小调整。如果流至少接收到一次数据,则此设置是正常的。有没有办法检测,如果流根本没有接收到数据。甚至一次都没有? - madhairsilence
我能否在Kafka消费者中使用您的解决方案,包括在processElement中使用out.collect?(我的完整问题请参见https://stackoverflow.com/questions/58280077/apache-flink-kafka-stream-get-all-messages-and-stop)。目前我的消费者不会停止并且会无限获取。 - techkuz
在上面的例子中,由于我们注册了一个ProcessingTimeTimer。我们应该指定TimeCharacteristic.EventTime/ProcessingTime,还是在这种情况下没有关系。有什么提示吗? - Mazen Ezzeddine
您可以随时注册处理时间计时器。如果您需要事件时间,则应通过TimeCharacteristics启用它。 - Fabian Hueske
2
这并没有回答问题。只有在处理函数至少接收到一个元素时,才能起作用,因为定时器只能在“processElement”中创建。 - kekkler

1
您可以使用自定义触发器函数设置时间窗口。在触发器函数中,每次收到事件时,“onEvent”方法都会将processingTimeTrigger设置为“currentTime + desiredTimeDelay”。然后,当新事件出现时,您会删除先前设置的触发器并创建一个新的触发器。如果在系统时间达到processingTimeTrigger的时间之前没有事件到达,则触发器会触发,窗口将被处理。即使没有事件到达,要处理的事件列表也将为空。

有代码吗?至少有片段可以提供吗? - madhairsilence
我会选择@Fabian Hueske的答案。对于您的目的来说更加直接。 - Jicaar

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接