如何从程序中停止flink流处理作业?

11

我正在尝试为一个将数据写入kafka话题并从同一kafka话题读取数据的Flink流作业创建JUnit测试,使用FlinkKafkaProducer09FlinkKafkaConsumer09。我在生产中传递了一个测试数据:

DataStream<String> stream = env.fromElements("tom", "jerry", "bill");

检查消费者是否提供相同的数据:

List<String> expected = Arrays.asList("tom", "jerry", "bill");
List<String> result =  resultSink.getResult();
assertEquals(expected, result);
使用TestListResultSink,我能够按预期查看来自消费者的数据并打印流。但是由于消费者会在消息完成后继续运行,因此无法获得Junit测试结果。是否有任何方式可以在FlinkFlinkKafkaConsumer09中停止进程或运行特定时间?
5个回答

9
根本问题在于流式程序通常不是有限的,而是无限地运行下去。
目前最好的方法是在流中插入一个特殊的控制消息,让源头适当终止(通过离开读取循环来停止读取更多数据)。这样,Flink会告诉所有下游操作符,它们可以在消耗完所有数据后停止运行。
或者,您可以在源头抛出一个特殊的异常(例如,在一段时间之后),以便您可以区分“正确”的终止和失败情况(通过检查错误原因)。在源头抛出异常将导致程序失败。

嗨@TillRohrmann,感谢您的回复。我已经尝试在map函数中处理完所有3个元素后抛出一些异常。但是在这种情况下,JUnit测试显示为失败,而我不希望出现这种情况。如果您能给我展示一个例子,那就太好了。提前致谢! - Mike
我通过以下方式成功让我的Flink作业在单元测试中停止:1)在代码中添加一个test-only标志,2)在图的某个阶段 Thread.sleep(10000) 然后抛出一个已知消息的异常,3)try { env.execute(); } catch ... 并检查异常消息,如果是已知消息,则吞下该异常;否则再次抛出该异常。 - winitzki

4

在测试中,您可以在单独的线程中启动作业执行,等待一些时间以允许进行数据处理,取消该线程(它将中断作业),然后进行断言。

CompletableFuture<Void> handle = CompletableFuture.runAsync(() -> {
    try {
        environment.execute(jobName);
    } catch (Exception e) {
        e.printStackTrace();
    }
});
try {
    handle.get(seconds, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    handle.cancel(true); // this will interrupt the job execution thread, cancel and close the job
}

// Make assertions here

1
你能否在反序列化器中不使用isEndOfStream重写以停止从Kafka获取数据?如果我读得正确,flink / Kafka09Fetcher在其运行方法中有以下代码,会打破事件循环。
    if (deserializer.isEndOfStream(value)) {
                        // end of stream signaled
                        running = false;
                        break;
                    }

我的想法是使用Till Rohrmann的控制消息与isEndOfStream方法结合使用,告诉KafkaConsumer停止读取。
有任何不能正常工作的原因吗?或者我忽略了一些特殊情况吗?

https://github.com/apache/flink/blob/07de86559d64f375d4a2df46d320fc0f5791b562/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L146


0

0

我在@Mariusz W.给出的答案基础上使用了Awaitility来运行我的Flink应用程序,仅在本地测试期间直到满足某些条件,这应该缩短测试时间:

  • https://www.baeldung.com/awaitility-testing
  • 我的Flink应用程序正在将数据汇入Kinesis数据流(在本地使用localstack测试容器进行模拟),因此我编写了以下内容,仅在“x”条记录到达输出Kinesis数据流时才运行Flink应用程序
  • 个人认为这比设置某种控制消息机制要容易得多
    private static void runFlinkUntilConditionIsTrue() {
        CompletableFuture<Void> handle = CompletableFuture.runAsync(() -> {
            try {
                env.execute();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        try {
            Awaitility
                    .await()
                    .atMost(Duration.ofSeconds(30L)) // max duration for which to check the condition before giving up
                    .until(
                            // poll until some condition is met (or until max duration runs out)
                            // for me, this was getRecordsInLocalKinesisOutputTopic() == x
                            () -> someCondition()
                    );
        } catch (ConditionTimeoutException e) {
            System.out.println("Condition not met in time");
        } finally {
            try {
                handle.get(0, TimeUnit.SECONDS);
            } catch (TimeoutException | ExecutionException | InterruptedException e) {
                // this will interrupt the job execution thread, cancel and close the job
                handle.cancel(true);
            }
        }
    }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接