Kafka Streams - 如何为 KTable 设置新的键

10

我是Kafka Streams的新手,正在使用1.0.0版本。我想从一个值中为KTable设置一个新的键。

在使用KStream时,可以使用selectKey()方法来完成此操作,如下所示。

kstream.selectKey ((k,v) -> v.newKey)

然而,这种方法在KTable中缺失。唯一的方法是将给定的KTable转换为KStream。对此有什么想法吗?这是否违背了KTable的设计原则?

5个回答

22
如果您想设置一个新的键,您需要重新分组KTable:
KTable newTable = table.groupBy(/*put select key function here*/)
                       .aggregate(...);

由于KTable(与KStream相反)必须具有唯一键,因此需要指定聚合函数来对所有具有相同(新)键的记录进行聚合,以生成单个值。

自Kafka 2.5起,Kafka Streams也支持KStream#toTable()运算符。因此,也可以执行table.toStream()。selectKey(...)。toTable()。使用这两种方法均存在优缺点。

使用toTable()的主要缺点是它将根据新键重新分区输入数据,导致对重新分区主题的交错写入,从而导致乱序数据。尽管第一种方法通过groupBy()使用相同的实现,但使用聚合函数可以帮助您明确解决“冲突”。如果使用toTable()运算符,则会基于重新分区主题的偏移顺序进行“盲目”插入更新操作(实际上类似于其他答案中的代码示例)。

示例:

Key | Value
 A  | (a,1)
 B  | (a,2)

如果您在 a 上重新键入,则输出表格将是以下两者之一(但其中一个未定义):

Key | Value          Key | Value
 a  | 1               a  |  2

对表进行“重新关键”的操作在语义上始终是不明确的。


1
请问你能帮我检查一下我的答案吗?我不知道Kafka Streams API设计的原理,但基于Kafka Streams并行化的方式来看,这个设计听起来很合理。 - yuranos
1
你所说的是正确的。但请注意,你可能会“滥用”聚合步骤,只需应用一个聚合器 (k,v,a) -> v,它只是盲目地选择“最新”的值--这个“最新”的值将在重新分配主题(groupBy()所暗示的)的偏移顺序中,但是由于你所指出的原因,当然,这个顺序将是“非确定性的”... - Matthias J. Sax
一些其他提出的答案不幸地遵循了这种模式,但实际上它是有问题的...(即,非确定性的...) - Matthias J. Sax
@MatthiasJ.Sax 那么到底应该使用哪种模式?我如何确定特定键的最新消息是什么? 而且,我理解得对吗,在第一次重新分区为KTable之后,您无法再保证KTable语义了吗? - fachexot
1
这取决于您的用例和要求。例如,如果您知道来自不同输入分区的两个记录不能映射到相同的键,则使用selectKey().toTable()就可以了。(类似地,如果两个记录“相距很远”,实际上不会发生竞争条件。)-- 使用聚合方法的优点是您可以比较键的当前值和新值,并决定是否应用更新(例如,您可以比较消息的时间戳)。 - Matthias J. Sax
显示剩余3条评论

11

@Matthias的答案引导了我正确的方向,但我认为提供一段代码示例可能会更有帮助。

final KTable<String, User> usersKeyedByApplicationIDKTable = usersKTable.groupBy(
        // First, going to set the new key to the user's application id
        (userId, user) -> KeyValue.pair(user.getApplicationID().toString(), user)
).aggregate(
        // Initiate the aggregate value
        () -> null,
        // adder (doing nothing, just passing the user through as the value)
        (applicationId, user, aggValue) -> user,
        // subtractor (doing nothing, just passing the user through as the value)
        (applicationId, user, aggValue) -> user
);

KGroupedTable aggregate()文档: https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/KGroupedTable.html#aggregate-org.apache.kafka.streams.kstream.Initializer-org.apache.kafka.streams.kstream.Aggregator-org.apache.kafka.streams.kstream.Aggregator-org.apache.kafka.streams.kstream.Materialized-

1
在减法中,你应该返回 null 吗? - Tudor
2
你编写的程序是非确定性的... @Jackson Oliveira 的方法也存在同样的问题:如果有两个上游记录映射到相同的新键,你不知道哪一个会最终出现在表中... - Matthias J. Sax

4
我认为@Matthias所描述的方式不够准确/详细。虽然正确,但这种限制(也存在于ksqlDB CREATE TABLE语法中)的根本原因并不仅仅是键必须在KTable中唯一这个事实。
唯一性本身并没有限制KTables。毕竟,任何底层主题都可以,并且经常包含具有相同键的消息。KTable对此没有问题。它只会强制执行每个键的最新状态。这样做会产生多种后果,包括基于单个输入消息将来自聚合函数的KTable构建到其输出主题中的多条消息...但让我们回到你的问题上来。
所以,KTable需要知道特定键的哪条消息是最后一条消息,这意味着它是该键的最新状态。

Kafka有哪些排序保证?是的,按分区为单位。

当消息重新进行键控时会发生什么?是的,它们将会被重新分配到与输入消息非常不同的分区。

那么,如果在Kafka Streams应用程序中传递的消息重新进行键控会发生什么?

它们将再次分布在不同的分区,但现在是用不同的键,如果您的应用程序被扩展并且有多个任务并行工作,您无法保证新键的最后一条消息实际上是存储在原始主题中的最后一条消息。独立的任务没有这样的协调。他们不能这样做,否则效率会降低。

因此,如果允许此类重新键控,则KTable将失去其主要语义。


它们将再次分布在不同的分区中,但现在使用不同的键。这是什么意思?现在的键与原始键不同(但所有具有相同原始键的消息都具有相同的新键),还是具有相同原始键的消息的键现在不同了? - fachexot
1
“但是所有具有相同原始密钥的消息都具有相同的新密钥” - 如果您正在进行重新密钥操作,这种情况并非如此。该操作的名称说明了一切。想象一下,您可能已经拥有了一个消息= {用户:fachexot}和一个密钥key = 123。根据您想要做什么,也许使用用户名作为密钥甚至完全舍弃原始代理密钥加入一些其他的消息体会更有意义。因此,您可能会得到message = {lastActive: 2021-09-22},key = fachexot作为结果。 - yuranos
我认为这很可能是这种情况:例如,message1={user: fachexot, number: 1},使用key=123,以及message2={user: fachexot, number: 2},使用key=123。重新加密后: message1={user: fachexot, number: 1},使用key=fachexot,以及message2={user: fachexot, number: 2},使用key=fachexot。 - fachexot
1
你的例子跟我的原始陈述一点关系也没有。“它们将再次跨越分区,但现在使用不同的键。” 在你的例子中,这正是发生的事情,因为原始键是123,作为转换的结果它变成了fachexot。但即使想要表明两个消息仍然会最终进入相同的分区,那只是一个特定的例子。它可以是这样,也可以是非常不同的方式。这不是关于一个具体的例子,而是关于Kafka Streams设计的整体概念。 - yuranos
我必须再次为我的情况提问:在上面的例子中,订单可以得到保证吗?当同一分区上具有相同键的两条消息导致同一分区上具有新但再次相同键的两条消息时,我是否可以重新键入而不会出现顺序问题? - fachexot
1
对于这两个事件,您不会遇到顺序问题。但是,存储在源Kafka主题中的其他事件(在原始事件之前或之后)可能会根据转换的整体复杂性以无序的方式到达目标主题。 - yuranos

3

对于正在使用confluent 5.5.+的人来说,有一种方法可以直接从流中提取密钥并将其转换为KTable:

       KTable<String, User> userTable = builder
            .stream("topic_name", Consumed.with(userIdSerde, userSerde))
            .selectKey((key, value) -> key.getUserId())             
            .toTable( Materialized.with(stringIdSerde, userSerde));

更多细节请点击这里


5
虽然这种方法“可行”,但需要考虑一个重要的问题。如果输入表中有两行映射到相同的新键,那么无法保证这两条记录在结果表中的顺序。因此,程序可能是不确定性的。 - Matthias J. Sax

1

@Allen Underwood的代码帮了我,但如果键是自定义Pojo,则需要进行一些更改。因为我得到了类转换异常。以下代码有效。

usersKTable.groupBy((k, v) -> KeyValue.pair(v.getCompositeKey(), v),Grouped.with(compositeKeySerde,valueSerde))
                .aggregate(
                        () -> null,
                        (applicationId, value, aggValue) -> value,
                        (applicationId, value, aggValue) -> value,
                        Materialized.with(compositeKeySerde, valueSerde)
                );

1
发现另一种简单方法,不确定效率如何。将表格转换为流,并使用select key更改键。将此流推送到新主题,然后让表格从新主题中读取。 - Sumeet
1
你编写的程序是非确定性的... @Jackson Oliveira 的方法也存在同样的问题:如果有两个上游记录映射到相同的新键,你不知道哪一个会最终出现在表中... - Matthias J. Sax

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接