使用Kafka Streams测试窗口聚合

6

我正在使用Kafka Streams的TopologyTestDriver来测试我们的数据管道。这与所有简单的拓扑结构都很奏效,包括使用存储器的有状态拓扑结构。

但是当我尝试使用此测试驱动程序来测试使用窗口聚合的拓扑结构时,就出现了问题。

我复制了一个简单的示例,它将在10秒窗口内对接收到的相同键的整数求和。

public class TopologyWindowTests {

TopologyTestDriver testDriver;
String INPUT_TOPIC = "INPUT.TOPIC";
String OUTPUT_TOPIC = "OUTPUT.TOPIC";

@Before
public void setup(){
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
    // EventProcessor is a <String,String> processor
    // so we set those serders
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
    testDriver = new TopologyTestDriver(defineTopology(),config,0L);
}

/**
 * topology test
 */
@Test
public void testTopologyNoCorrelation() throws IOException {
    ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(INPUT_TOPIC, new StringSerializer(), new IntegerSerializer());
    testDriver.pipeInput(factory.create(INPUT_TOPIC,"k",2,1L));

    ProducerRecord<String, Integer> outputRecord = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), new IntegerDeserializer());

    Assert.assertNull(outputRecord);
}

@After
public void tearDown() {
    testDriver.close();
}

/**
 * Defines topology
 * @return
 */
public Topology defineTopology(){
    StreamsBuilder builder = new StreamsBuilder();
    KStream<String,Integer> inputStream = builder.stream(INPUT_TOPIC);

    KTable<Windowed<String>, Integer> groupedMetrics = inputStream.groupBy((key,value)->key,
            Serialized.with(Serdes.String(),Serdes.Integer())).windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10))).aggregate(
            ()-> 0,
            (String aggKey, Integer newValue, Integer aggValue)->{
                Integer val = aggValue+newValue;
                return val;
            },
            Materialized.<String,Integer,WindowStore<Bytes,byte[]>>as("GROUPING.WINDOW").withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer())
    );

    groupedMetrics.toStream().map((key,value)->KeyValue.pair(key.key(),value)).to(OUTPUT_TOPIC);

    return builder.build();

}

我期望在这个测试用例中,除非我将墙钟时间推进10秒,否则不会向输出主题返回任何内容…但是我得到了以下输出

java.lang.AssertionError: expected null, but was:<ProducerRecord(topic=OUTPUT.TOPIC, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=k, value=2, timestamp=0)>

我有点不明白这里是怎么回事?我正在使用kafka 2.0.0。

更新

提前感谢您的帮助。

根据Matthias的回复,我已经准备好了以下测试:

@Test
public void testTopologyNoCorrelation() throws IOException {
    ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(INPUT_TOPIC, new StringSerializer(), new IntegerSerializer());
    testDriver.pipeInput(factory.create(INPUT_TOPIC,"k",2,1L));
    testDriver.pipeInput(factory.create(INPUT_TOPIC,"k",2,1L));

    // Testing 2+2=4
    ProducerRecord<String, Integer> outputRecord1 = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), new IntegerDeserializer());
    Assert.assertEquals(Integer.valueOf(4),outputRecord1.value());

    // Testing no more events in the window
    ProducerRecord<String, Integer> outputRecord2 = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), new IntegerDeserializer());
    Assert.assertNull(outputRecord2);
}

两个输入消息使用了相同的时间戳,因此我期望输出主题中只有一个事件,其值为我的值之和。然而,我收到了 2 个输出事件(第一个值为 2,第二个值为 4),我认为这不是拓扑结构的期望行为。

1个回答

4
默认情况下,Kafka Streams 在窗口操作中使用事件时间(event-time),而非墙钟时间(wall-clock-time)。这保证了确定性处理语义(墙钟时间处理本质上是不确定的)。有关详细信息,请查看文档:https://docs.confluent.io/current/streams/concepts.html#time
因此,您输入记录的时间戳决定了记录放在哪个窗口中。此外,您输入记录的时间戳会推进内部跟踪的“流时间”,该时间基于这些事件时间戳。
还要注意,Kafka Streams 遵循连续处理模型,并发出更新后的结果,而不是等待窗口结束条件。这对于处理迟到(也称为乱序)数据非常重要。请参阅如何发送时间窗口化 KTable 的最终 kafka-streams 聚合结果?https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/进行比较。 更新 这是由于“更新”处理模型。当您聚合时,每个输入记录都会更新“当前”结果,并产生一个“当前结果输出记录”。这适用于每个记录(而不是每个时间戳)。

1
感谢Matthias的快速回复。我添加了一个新的测试,展示当我向输入主题发送具有相同时间戳的2个事件时,测试驱动程序的行为。我期望在输出中只有这两个值的总和,但是我得到了2个事件...你能解释一下吗? - David O
@DavidO 扩展了我的答案 - Matthias J. Sax
1
我理解您的观点,但是当对接Kafka集群时,我看到的行为与此不同。在这种情况下,无论我发送了多少条记录,在每个窗口中我只能看到一个输出事件。因此,我的观点是有些东西没有像应该那样运行,因为针对测试驱动程序和针对Kafka集群的行为是不同的。您可以在这里找到整个项目:https://github.com/davidonoro/ks-streaming-example - David O
2
不同之处在于TopologyTestDriver在处理每个记录后都会提交(包括刷新KTable状态存储缓存)。当您针对集群运行时,Kafka Streams默认每30秒才提交一次,因此连续的更新会被“去重”。如果禁用缓存,则可以为两者获得相同的行为。请参阅https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html进行比较。 - Matthias J. Sax
所以根据最后一条评论,使用拓扑测试驱动程序无法准确测试窗口流拓扑结构? - bad robot
为什么不呢?你仍然可以测试它。 - Matthias J. Sax

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接