测试kafka和flink的集成流程

3
我想测试Kafka/Flink的整合,例如使用FlinkKafkaConsumer011FlinkKafkaProducer011
流程如下:
  1. 使用Flink从Kafka中读取
  2. 使用Flink进行操作
  3. 使用Flink将结果写入另一个Kafka Topic
一个字符串的示例可以是:从输入Topic中读取字符串,将其转换为大写,然后写入新的Topic。
问题是如何测试这个流程?
我所说的测试是单元/集成测试。
谢谢!

您可以查看https://github.com/manub/scalatest-embedded-kafka。这是一个在Kafka上进行测试的优秀工具。 - alifirat
我已经检查了这个库,问题是在Flink中我们必须使用 FlinkKafkaConsumer***FlinkKafkaProducer***,似乎它不能与此嵌入式kafka一起工作,因为它们不是KafkaConsumer或KafkaProducer。 - Thomas
1个回答

5
Flink文档中有一小节介绍如何为您的转换操作编写单元测试\集成测试:链接。该文档还有一个关于测试检查点和状态处理以及使用AbstractStreamOperatorTestHarness的小节。
然而,我认为您更感兴趣的是端到端(包括源和接收器)集成测试。为此,您可以启动Flink Mini Cluster。这是一个示例代码,用于启动Flink Mini Cluster:链接
您还可以在JVM内部启动Kafka Broker,用于测试目的。Flink的Kafka连接器就是用它进行集成测试的。这里是一个启动Kafka服务器的样例代码:链接
如果您在本地运行,则可以使用简单的生成器应用程序为您的源Kafka Topic生成消息(有许多可用的选项。您可以连续生成消息或基于不同的配置间隔生成消息)。这里有一个示例,说明如何在本地运行时设置Flink作业全局参数:Kafka010Example
另一种选择是创建一个集成环境(而不是生产环境)来运行端到端测试。您将能够真正感受到您的程序在类似生产环境的环境中的行为。始终建议拥有完整的并行测试环境,包括测试源\接收器 Kafka 主题。

你有没有关于如何创建这样的集成环境的建议?我想要从k8s开始一切,并为了在主题中引导一些数据,使我的作业能够读取它,我考虑创建一个新的作业来创建这样的数据,然后提供模拟响应以消耗来自我的生产作业的数据,这样我就可以模拟所有的生产环境。你知道或者有比这更好的想法吗? - Georgi Stoyanov

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接