Kafka Streams API:从KStream到KTable

38

我有一个Kafka主题,我在那里发送位置事件(键=key=user_id,值=value=user_location)。我可以将其作为KStream读取和处理:

KStreamBuilder builder = new KStreamBuilder();

KStream<String, Location> locations = builder
        .stream("location_topic")
        .map((k, v) -> {
            // some processing here, omitted form clarity
            Location location = new Location(lat, lon);
            return new KeyValue<>(k, location);
        });

那很好用,但我想要一个 KTable ,其中包含每个用户的最后已知位置。我该怎么做呢? 我可以通过写入和从中间主题读取来实现它:
// write to intermediate topic
locations.to(Serdes.String(), new LocationSerde(), "location_topic_aux");

// build KTable from intermediate topic
KTable<String, Location> table = builder.table("location_topic_aux", "store");

有没有一种简单的方法从KStream获取KTable?这是我第一次使用Kafka Streams编写应用程序,所以我可能会漏掉一些明显的东西。
1个回答

34

更新:

在 Kafka 2.5 中,将添加一个新方法 KStream#toTable(),它提供了一种方便的方式将 KStream 转换成 KTable。详情请参见:https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL

原始回答:

目前没有直接的方法可以实现这个功能。你的方法是完全正确的,正如 Confluent 常见问题解答中所讨论的那样:http://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step

就代码而言,这是最简单的方法。但它的缺点是(a)你需要管理一个额外的主题,并且(b)会产生额外的网络流量,因为数据被写入并重新从 Kafka 读取。

还有一种替代方案,使用“虚拟缩减”:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> stream = ...; // some computation that creates the derived KStream

KTable<String, Long> table = stream.groupByKey().reduce(
    new Reducer<Long>() {
        @Override
        public Long apply(Long aggValue, Long newValue) {
            return newValue;
        }
    },
    "dummy-aggregation-store");
这种方法在代码方面比选项1要复杂一些,但有一个优点,就是(a)不需要手动主题管理,(b)不需要重新从Kafka读取数据。总的来说,你需要自己决定哪种方法更好: 在选项2中,Kafka Streams将创建一个内部changelog主题以备份KTable以实现容错性。因此,这两种方法都需要在Kafka中进行一些额外的存储,并导致额外的网络流量。总体上,这是在选项2中略微复杂的代码与选项1中的手动主题管理之间的权衡。

我正在尝试使用您的方法,通过进行愚蠢的groupByKey来构建一个KStreamKTable,但是该方法无法解析。您有任何想法可能出了什么问题吗?(我对Java生态系统和Kafka都很陌生) - LetsPlayYahtzee
4
你的Streams版本是什么?对于旧版本,应该使用stream.reduceByKey(...)而不是stream.groupByKey().reduce(...)。请参阅http://docs.confluent.io/3.1.0/streams/upgrade-guide.html#stream-grouping-and-aggregation。 - Matthias J. Sax
1
我以为我在使用最新版本,但实际上我使用的是 0.10.0 版本,而查看的文档是 0.10.1 版本的。所以我已经修复了 :) 谢谢 - LetsPlayYahtzee
2
使用您的“虚拟缩减”功能,您如何在生成的ktable中删除一个条目? 我的理解是,缩减将简单地忽略任何空值。更新:我看到您在另一个线程上的评论,指出使用“代理”,这确实是我过去所做的。https://dev59.com/hlUL5IYBdhLWcg3wM1k6 - AFrieze
虽然上述选项中哪一个更倾向于最佳实践? - Mujtaba Faizi

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接