Kafka Streams可以配置等待KTable加载吗?

11

我正在使用materialized KTable与我的KStream进行左连接(其中stream是左侧)。

然而,它似乎立即处理,而不等待KTable的当前版本加载完毕。

我的源主题中有很多值用于KTable,并且当我启动应用程序时,许多连接失败(好吧,实际上不是因为这是一个左连接)。

我能否延迟启动以等待初始主题加载?

2个回答

13

Kafka Streams中的处理是时间同步的,因此基于记录时间戳顺序来处理表输入主题和流输入主题。这在语义上是合理的,因为在流-表连接中,您不希望将流记录与旧版本或新版本的KTable连接,而是根据流记录时间戳连接正确的版本。

如果数据没有适当地标记时间戳,则可以尝试通过builder.table(...,Consumed.with(...))指定自定义时间戳提取器,以返回确保正确行为的时间戳(即,可能小于第一个流记录的时间戳?)

请注意,正确的时间戳同步要求Kafka Streams 2.1。较旧的版本仅以最佳努力方式同步时间,可能无法提供所需的行为。有关详细信息,请参见KIP-353。

Kafka 3.0 带来了更多的时间戳同步改进: https://cwiki.apache.org/confluence/display/KAFKA/KIP-695%3A+Further+Improve+Kafka+Streams+Timestamp+Synchronization


1
实际上,我确实想加入最新版本的KTable。有什么方法可以做到这一点吗?如果您使用的是Kafka Streams 2.1或更高版本,则可以为KTable使用自定义时间戳提取器,该提取器始终返回0作为时间戳。这样,您就可以获得不同步的行为,并且KTable更新会立即应用。请注意,不同步处理使您的应用程序固有地不确定性,因此您无法应用时间旅行来重现先前的结果。 - Matthias J. Sax
尝试了您建议使用零时间戳的方法,它在新版本的kafka中有效!请将其添加到您的答案中,因为这是解决方案。 - Ben Yaakobi
@MatthiasJ.Sax 非常感谢您提供的宝贵见解。我目前也遇到了同样的情况(Kafka 2.3.0)。我正在尝试从主题A和B中读取,并从主题B创建一个表(在对来自B流的一些关键转换之后)。我想确保B先于A被读取,以便我不会“错过”连接...因此,在B消息中将时间戳设置为0(使用时间戳提取器)理论上可以工作吗?还是我漏掉了什么? - nsanglar
1
使用自定义的TimestampExtractor并让它对所有记录返回0,基本上就可以模仿在启动时引导的GlobalKTable的行为。这应该是可行的。 - Matthias J. Sax
1
这取决于您是否仅对最新结果感兴趣,或者还想重放旧结果 - 如果要重放旧结果,则需要正确打时间戳的数据。对于最新结果,如果您将所有记录的时间戳归零,则是正确的(因为连接最终是一致的)。但在流式场景中,“最新”结果的概念总是有点模糊,因为它可能会不断改变... - 对于流-表连接,这个“问题”更加重要;而对于表-表连接则不然。 - Matthias J. Sax
显示剩余7条评论

2
你可以使用GlobalKTable。它会等待所有值同步。最初的回答中提到了这个方法。

2
使用GlobalKTable会改变程序的语义。它不是KTable的一对一替代品。 - Matthias J. Sax

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接