我正在使用Kafka Streams的TopologyTestDriver来测试我们的数据管道。这与所有简单的拓扑结构都很奏效,包括使用存储器的有状态拓扑结构。
但是当我尝试使用此测试驱动程序来测试使用窗口聚合的拓扑结构时,就出现了问题。
我复制了一个简单的示例,它将在10秒窗口内对接收到的相同键的整数求和。
public class TopologyWindowTests {
TopologyTestDriver testDriver;
String INPUT_TOPIC = "INPUT.TOPIC";
String OUTPUT_TOPIC = "OUTPUT.TOPIC";
@Before
public void setup(){
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
// EventProcessor is a <String,String> processor
// so we set those serders
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
testDriver = new TopologyTestDriver(defineTopology(),config,0L);
}
/**
* topology test
*/
@Test
public void testTopologyNoCorrelation() throws IOException {
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(INPUT_TOPIC, new StringSerializer(), new IntegerSerializer());
testDriver.pipeInput(factory.create(INPUT_TOPIC,"k",2,1L));
ProducerRecord<String, Integer> outputRecord = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), new IntegerDeserializer());
Assert.assertNull(outputRecord);
}
@After
public void tearDown() {
testDriver.close();
}
/**
* Defines topology
* @return
*/
public Topology defineTopology(){
StreamsBuilder builder = new StreamsBuilder();
KStream<String,Integer> inputStream = builder.stream(INPUT_TOPIC);
KTable<Windowed<String>, Integer> groupedMetrics = inputStream.groupBy((key,value)->key,
Serialized.with(Serdes.String(),Serdes.Integer())).windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10))).aggregate(
()-> 0,
(String aggKey, Integer newValue, Integer aggValue)->{
Integer val = aggValue+newValue;
return val;
},
Materialized.<String,Integer,WindowStore<Bytes,byte[]>>as("GROUPING.WINDOW").withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer())
);
groupedMetrics.toStream().map((key,value)->KeyValue.pair(key.key(),value)).to(OUTPUT_TOPIC);
return builder.build();
}
我期望在这个测试用例中,除非我将墙钟时间推进10秒,否则不会向输出主题返回任何内容…但是我得到了以下输出
java.lang.AssertionError: expected null, but was:<ProducerRecord(topic=OUTPUT.TOPIC, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=k, value=2, timestamp=0)>
我有点不明白这里是怎么回事?我正在使用kafka 2.0.0。
更新
提前感谢您的帮助。
根据Matthias的回复,我已经准备好了以下测试:
@Test
public void testTopologyNoCorrelation() throws IOException {
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(INPUT_TOPIC, new StringSerializer(), new IntegerSerializer());
testDriver.pipeInput(factory.create(INPUT_TOPIC,"k",2,1L));
testDriver.pipeInput(factory.create(INPUT_TOPIC,"k",2,1L));
// Testing 2+2=4
ProducerRecord<String, Integer> outputRecord1 = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), new IntegerDeserializer());
Assert.assertEquals(Integer.valueOf(4),outputRecord1.value());
// Testing no more events in the window
ProducerRecord<String, Integer> outputRecord2 = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), new IntegerDeserializer());
Assert.assertNull(outputRecord2);
}
两个输入消息使用了相同的时间戳,因此我期望输出主题中只有一个事件,其值为我的值之和。然而,我收到了 2 个输出事件(第一个值为 2,第二个值为 4),我认为这不是拓扑结构的期望行为。
TopologyTestDriver
在处理每个记录后都会提交(包括刷新KTable状态存储缓存)。当您针对集群运行时,Kafka Streams默认每30秒才提交一次,因此连续的更新会被“去重”。如果禁用缓存,则可以为两者获得相同的行为。请参阅https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html进行比较。 - Matthias J. Sax