我正在尝试为一个将数据写入kafka话题并从同一kafka话题读取数据的Flink流作业创建JUnit测试,使用FlinkKafkaProducer09
和FlinkKafkaConsumer09
。我在生产中传递了一个测试数据:
DataStream<String> stream = env.fromElements("tom", "jerry", "bill");
检查消费者是否提供相同的数据:
List<String> expected = Arrays.asList("tom", "jerry", "bill");
List<String> result = resultSink.getResult();
assertEquals(expected, result);
使用TestListResultSink
,我能够按预期查看来自消费者的数据并打印流。但是由于消费者会在消息完成后继续运行,因此无法获得Junit测试结果。是否有任何方式可以在Flink
或FlinkKafkaConsumer09
中停止进程或运行特定时间?
Thread.sleep(10000)
然后抛出一个已知消息的异常,3)try { env.execute(); } catch ...
并检查异常消息,如果是已知消息,则吞下该异常;否则再次抛出该异常。 - winitzki