问题背景
目前我们正在使用 Kafka Streams API(版本 1.1.0)处理来自 Kafka 集群的消息(3 个代理,每个主题有 3 个分区,副本因子为 2)。安装的 Kafka 版本为 1.1.1。
最终用户向我们报告了数据消失的问题。他们报告说突然间他们无法看到任何数据(例如,昨天他们在 UI 中可以看到 n 条记录,第二天早上表格就为空了)。我们检查了特定用户的 changelog 主题,发现情况很奇怪,似乎经过几天的不活动之后(给定的键值对可能数天未更改),changelog 主题中的累计值消失了。
代码
KTable 的组装线:(将事件按“用户名”分组)
@Bean
public KTable<UsernameVO, UserItems> itemsOfTheUser() {
return streamsBuilder.stream("application-user-UserItems", Consumed.with(Serdes.String(), serdes.forA(UserItems.class)))
.groupBy((key, event) -> event.getUsername(),
Serialized.with(serdes.forA(UsernameVO.class), serdes.forA(UserItems.class)))
.aggregate(
UserItems::none,
(key, event, userItems) ->
userItems.after(event),
Materialized
.<UsernameVO, UserItems> as(persistentKeyValueStore("application-user-UserItems"))
.withKeySerde(serdes.forA(UsernameVO.class))
.withValueSerde(serdes.forA(UserItems.class)));
}
聚合对象(KTable 值):
public class UserItems {
private final Map<String, Item> items;
public static UserItems none() {
return new UserItems();
}
private UserItems() {
this(emptyMap());
}
@JsonCreator
private UserItems(Map<String, Item> userItems) {
this.userItems = userItems;
}
@JsonValue
@SuppressWarnings("unused")
Map<String, Item> getUserItems() {
return Collections.unmodifiableMap(items);
}
...
public UserItems after(ItemAddedEvent itemEvent) {
Item item = Item.from(itemEvent);
Map<String, Item> newItems = new HashMap<>(items);
newItems.put(itemEvent.getItemName(), item);
return new UserItems(newItems);
}
Kafka主题
application-user-UserItems
这个源主题没有问题。它的保留设置为最大值,所有消息始终存在。
application-user-UserItems-store-changelog(压缩。具有默认配置 - 未更改保留或其他内容)
这里是奇怪的部分。我们可以观察到在某些用户的更改日志中,值丢失了:
Offset | Partition | Key | Value
...........................................
...
320 0 "User1" : {"ItemName1":{"param":"foo"}}
325 0 "User1" : {"ItemName1":{"param":"foo"},"ItemName2":{"param":"bar"}}
1056 0 "User1" : {"ItemName3":{"param":"zyx"}}
...
我们可以看到,首先消息被正确地聚合:处理了Item1,然后将Item2应用于聚合。但是经过一段时间——可能是几天——另一个事件正在处理——基础“User1”键下的值似乎已经丢失,只有Item3存在。
在应用程序中,用户无法一次性删除所有项目并添加另一个项目,用户只能逐个添加或删除项目。因此,如果他删除了ItemName1和ItemName2,然后添加了ItemName3,我们期望在更改日志中看到以下内容:
Offset | Partition | Key | Value
..............................................
...
320 0 "User1" : {"ItemName1":{"param":"foo"}}
325 0 "User1" : {"ItemName1":{"param":"foo"},"ItemName2":{"param":"bar"}}
1054 0 "User1" : {"ItemName2":{"param":"bar"}}
1055 0 "User1" : {}
1056 0 "User1" : {"ItemName3":{"param":"zyx"}}
结论
起初我们认为这与变更日志主题保留有关(但我们检查过,只启用了压缩)。
application-user-UserItems-store-changelog PartitionCount:3 ReplicationFactor:1 Configs:cleanup.policy=compact,max.message.bytes=104857600
Topic: application-user-UserItems-store-changelog Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: application-user-UserItems-store-changelog Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: application-user-UserItems-store-changelog Partition: 2 Leader: 1 Replicas: 1 Isr:
任何想法或提示将不胜感激。干杯!
Serde
有关吗?put(key,null)
被解释为删除 - 因此,如果由于任何原因在序列化期间返回了null
,则记录将被删除。 - Matthias J. Saxnull
,我们也可以采取相反的方式,通过提供的Initializer
重新初始化聚合。 - Matthias J. Sax