我有一个Kafka主题,我在那里发送位置事件(键=key=user_id,值=value=user_location)。我可以将其作为KStream
读取和处理:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, Location> locations = builder
.stream("location_topic")
.map((k, v) -> {
// some processing here, omitted form clarity
Location location = new Location(lat, lon);
return new KeyValue<>(k, location);
});
那很好用,但我想要一个
KTable
,其中包含每个用户的最后已知位置。我该怎么做呢?
我可以通过写入和从中间主题读取来实现它:// write to intermediate topic
locations.to(Serdes.String(), new LocationSerde(), "location_topic_aux");
// build KTable from intermediate topic
KTable<String, Location> table = builder.table("location_topic_aux", "store");
有没有一种简单的方法从
KStream
获取KTable
?这是我第一次使用Kafka Streams编写应用程序,所以我可能会漏掉一些明显的东西。
groupByKey
来构建一个KStream
的KTable
,但是该方法无法解析。您有任何想法可能出了什么问题吗?(我对Java生态系统和Kafka都很陌生) - LetsPlayYahtzeestream.reduceByKey(...)
而不是stream.groupByKey().reduce(...)
。请参阅http://docs.confluent.io/3.1.0/streams/upgrade-guide.html#stream-grouping-and-aggregation。 - Matthias J. Sax0.10.0
版本,而查看的文档是0.10.1
版本的。所以我已经修复了 :) 谢谢 - LetsPlayYahtzee