Kafka Stream 时间窗口计数不报告零值

7
我正在使用Kafka Streams来计算在一个滚动时间窗口内发生了多少事件,该时间窗口为3分钟:
public class ViewCountAggregator {

    void buildStream(KStreamBuilder builder) {      

        final Serde<String> stringSerde = Serdes.String();
        final Serde<Long> longSerde = Serdes.Long();

        KStream<String, String> views = builder.stream(stringSerde, stringSerde, "streams-view-count-input");
        KStream<String, Long> viewCount = views
            .groupBy((key, value) -> value)
            .count(TimeWindows.of(TimeUnit.MINUTES.toMillis(3)).advanceBy(TimeUnit.MINUTES.toMillis(1)))
            .toStream()
            .map((key, value) -> new KeyValue<>(key.key(), value));

        viewCount.to(stringSerde, longSerde, "streams-view-count-output");        
    }

    public static void main(String[] args) throws Exception {                   
        // some not so important initialization code
        ...  
    }

}

当运行消费者并将一些消息推送到输入主题时,随着时间的推移,它会接收到以下更新:
single  1
single  1
single  1
five    1
five    4
five    5
five    4
five    1

几乎是正确的,但它从未接收以下更新:
single  0
five    0

没有它,我的消费者在较长一段时间内没有事件时将永远无法将计数器归零。我期望消耗的消息看起来像这样:
single  1
single  1
single  1
single  0
five    1
five    4
five    5
five    4
five    1
five    0

有没有一些我忽略的配置选项/参数可以帮助我实现这种行为?
1个回答

11
这段话涉及到的计算输出基本正确,但它永远不会接收以下更新:
首先,计算输出是正确的。
其次,为什么它是正确的:
如果应用窗口聚合,只有那些实际上存在内容的窗口才会被创建(我熟悉的所有其他系统都会产生相同的输出)。因此,如果对于某个键,没有数据的时间段长于窗口大小,则不会实例化任何窗口,因此也根本没有计数。
不实例化没有内容的窗口的原因很简单:处理器无法知道所有键。在您的示例中,有两个键,但以后可能会出现第三个键。您是否希望从一开始就获得?另外,由于数据流具有无限性质,键可能会消失而永远不会重新出现。如果记住所有已看到的键,并在不存在某个键的数据时发出,您会永远发出吗?
我不想说你期望的结果/语义没有意义。这只是你非常特定的用例,不适用于一般情况。因此,流处理器不予实现。
第三点:你可以做什么?
有多种选择:
1. 您的消费者可以跟踪它看到了什么键,并使用嵌入的记录时间戳来确定某个键是否“丢失”,然后为此键将计数器设置为零(对于此操作,删除map步骤并保留Windowed<K>类型以便让消费者得知记录属于哪个窗口可能有所帮助)。 2. 在您的流应用程序中添加一个有状态的#transform()步骤,执行与(1)中描述的相同的操作。为此,注册一个标点回调可能会有所帮助。

使用方法(2)可以更容易地跟踪键,因为您可以将状态存储附加到转换步骤中,因此不需要在下游消费者中处理状态(以及故障/恢复)。

然而,两种方法的棘手之处仍然是决定何时缺少键,即需要等待多长时间才能生成<key,0>。请注意,数据可能会迟到(也称为乱序),即使您发出了<key,0>, 迟到的记录可能会在您的代码发出<key,0>记录后产生<key,1>消息。但是,如果您似乎只使用最新窗口,则这可能并不是您的问题。

最后还有一个评论:看起来您只使用最新计数,并且新窗口覆盖旧窗口,因此值得探索“交互式查询”直接访问您的count运算符状态,而无需消耗主题并更新其他状态。这可能会大大简化您的下游应用程序的重新设计。请查看文档和一篇非常好的关于交互式查询的博客文章以获取更多详细信息。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接