我正在使用materialized KTable与我的KStream进行左连接(其中stream是左侧)。
然而,它似乎立即处理,而不等待KTable的当前版本加载完毕。
我的源主题中有很多值用于KTable,并且当我启动应用程序时,许多连接失败(好吧,实际上不是因为这是一个左连接)。
我能否延迟启动以等待初始主题加载?
我正在使用materialized KTable与我的KStream进行左连接(其中stream是左侧)。
然而,它似乎立即处理,而不等待KTable的当前版本加载完毕。
我的源主题中有很多值用于KTable,并且当我启动应用程序时,许多连接失败(好吧,实际上不是因为这是一个左连接)。
我能否延迟启动以等待初始主题加载?
Kafka Streams中的处理是时间同步的,因此基于记录时间戳顺序来处理表输入主题和流输入主题。这在语义上是合理的,因为在流-表连接中,您不希望将流记录与旧版本或新版本的KTable
连接,而是根据流记录时间戳连接正确的版本。
如果数据没有适当地标记时间戳,则可以尝试通过builder.table(...,Consumed.with(...))
指定自定义时间戳提取器,以返回确保正确行为的时间戳(即,可能小于第一个流记录的时间戳?)
请注意,正确的时间戳同步要求Kafka Streams 2.1。较旧的版本仅以最佳努力方式同步时间,可能无法提供所需的行为。有关详细信息,请参见KIP-353。
Kafka 3.0 带来了更多的时间戳同步改进: https://cwiki.apache.org/confluence/display/KAFKA/KIP-695%3A+Further+Improve+Kafka+Streams+Timestamp+Synchronization
GlobalKTable
会改变程序的语义。它不是KTable
的一对一替代品。 - Matthias J. Sax
TimestampExtractor
并让它对所有记录返回0
,基本上就可以模仿在启动时引导的GlobalKTable
的行为。这应该是可行的。 - Matthias J. Sax