我是Kafka Streams的新手,正在使用1.0.0版本。我想从一个值中为KTable设置一个新的键。
在使用KStream时,可以使用selectKey()方法来完成此操作,如下所示。
kstream.selectKey ((k,v) -> v.newKey)
然而,这种方法在KTable中缺失。唯一的方法是将给定的KTable转换为KStream。对此有什么想法吗?这是否违背了KTable的设计原则?
我是Kafka Streams的新手,正在使用1.0.0版本。我想从一个值中为KTable设置一个新的键。
在使用KStream时,可以使用selectKey()方法来完成此操作,如下所示。
kstream.selectKey ((k,v) -> v.newKey)
然而,这种方法在KTable中缺失。唯一的方法是将给定的KTable转换为KStream。对此有什么想法吗?这是否违背了KTable的设计原则?
KTable newTable = table.groupBy(/*put select key function here*/)
.aggregate(...);
由于KTable(与KStream相反)必须具有唯一键,因此需要指定聚合函数来对所有具有相同(新)键的记录进行聚合,以生成单个值。
自Kafka 2.5起,Kafka Streams也支持KStream#toTable()
运算符。因此,也可以执行table.toStream()。selectKey(...)。toTable()
。使用这两种方法均存在优缺点。
使用toTable()
的主要缺点是它将根据新键重新分区输入数据,导致对重新分区主题的交错写入,从而导致乱序数据。尽管第一种方法通过groupBy()
使用相同的实现,但使用聚合函数可以帮助您明确解决“冲突”。如果使用toTable()
运算符,则会基于重新分区主题的偏移顺序进行“盲目”插入更新操作(实际上类似于其他答案中的代码示例)。
示例:
Key | Value
A | (a,1)
B | (a,2)
如果您在 a
上重新键入,则输出表格将是以下两者之一(但其中一个未定义):
Key | Value Key | Value
a | 1 a | 2
对表进行“重新关键”的操作在语义上始终是不明确的。
@Matthias的答案引导了我正确的方向,但我认为提供一段代码示例可能会更有帮助。
final KTable<String, User> usersKeyedByApplicationIDKTable = usersKTable.groupBy(
// First, going to set the new key to the user's application id
(userId, user) -> KeyValue.pair(user.getApplicationID().toString(), user)
).aggregate(
// Initiate the aggregate value
() -> null,
// adder (doing nothing, just passing the user through as the value)
(applicationId, user, aggValue) -> user,
// subtractor (doing nothing, just passing the user through as the value)
(applicationId, user, aggValue) -> user
);
ksqlDB
CREATE TABLE
语法中)的根本原因并不仅仅是键必须在KTable中唯一这个事实。KTables
。毕竟,任何底层主题都可以,并且经常包含具有相同键的消息。KTable
对此没有问题。它只会强制执行每个键的最新状态。这样做会产生多种后果,包括基于单个输入消息将来自聚合函数的KTable
构建到其输出主题中的多条消息...但让我们回到你的问题上来。KTable需要知道特定键的哪条消息是最后一条消息,这意味着它是该键的最新状态。
Kafka有哪些排序保证?是的,按分区为单位。
当消息重新进行键控时会发生什么?是的,它们将会被重新分配到与输入消息非常不同的分区。
那么,如果在Kafka Streams应用程序中传递的消息重新进行键控会发生什么?
它们将再次分布在不同的分区,但现在是用不同的键,如果您的应用程序被扩展并且有多个任务并行工作,您无法保证新键的最后一条消息实际上是存储在原始主题中的最后一条消息。独立的任务没有这样的协调。他们不能这样做,否则效率会降低。
因此,如果允许此类重新键控,则KTable
将失去其主要语义。
对于正在使用confluent 5.5.+的人来说,有一种方法可以直接从流中提取密钥并将其转换为KTable:
KTable<String, User> userTable = builder
.stream("topic_name", Consumed.with(userIdSerde, userSerde))
.selectKey((key, value) -> key.getUserId())
.toTable( Materialized.with(stringIdSerde, userSerde));
更多细节请点击这里
@Allen Underwood的代码帮了我,但如果键是自定义Pojo,则需要进行一些更改。因为我得到了类转换异常。以下代码有效。
usersKTable.groupBy((k, v) -> KeyValue.pair(v.getCompositeKey(), v),Grouped.with(compositeKeySerde,valueSerde))
.aggregate(
() -> null,
(applicationId, value, aggValue) -> value,
(applicationId, value, aggValue) -> value,
Materialized.with(compositeKeySerde, valueSerde)
);
(k,v,a) -> v
,它只是盲目地选择“最新”的值--这个“最新”的值将在重新分配主题(groupBy()
所暗示的)的偏移顺序中,但是由于你所指出的原因,当然,这个顺序将是“非确定性的”... - Matthias J. SaxselectKey().toTable()
就可以了。(类似地,如果两个记录“相距很远”,实际上不会发生竞争条件。)-- 使用聚合方法的优点是您可以比较键的当前值和新值,并决定是否应用更新(例如,您可以比较消息的时间戳)。 - Matthias J. Sax