Kafka Streams - KTable中值的消失问题

4

问题背景

目前我们正在使用 Kafka Streams API(版本 1.1.0)处理来自 Kafka 集群的消息(3 个代理,每个主题有 3 个分区,副本因子为 2)。安装的 Kafka 版本为 1.1.1。

最终用户向我们报告了数据消失的问题。他们报告说突然间他们无法看到任何数据(例如,昨天他们在 UI 中可以看到 n 条记录,第二天早上表格就为空了)。我们检查了特定用户的 changelog 主题,发现情况很奇怪,似乎经过几天的不活动之后(给定的键值对可能数天未更改),changelog 主题中的累计值消失了。

代码

KTable 的组装线:(将事件按“用户名”分组)


@Bean
public KTable<UsernameVO, UserItems> itemsOfTheUser() {
    return streamsBuilder.stream("application-user-UserItems", Consumed.with(Serdes.String(), serdes.forA(UserItems.class)))
                         .groupBy((key, event) -> event.getUsername(),
                                 Serialized.with(serdes.forA(UsernameVO.class), serdes.forA(UserItems.class)))
                         .aggregate(
                                 UserItems::none,
                                 (key, event, userItems) ->
                                         userItems.after(event),
                                 Materialized
                                         .<UsernameVO, UserItems> as(persistentKeyValueStore("application-user-UserItems"))
                                         .withKeySerde(serdes.forA(UsernameVO.class))
                                         .withValueSerde(serdes.forA(UserItems.class)));
}

聚合对象(KTable 值):


public class UserItems {

private final Map<String, Item> items;

public static UserItems none() {
    return new UserItems();
}

private UserItems() {
    this(emptyMap());
}

@JsonCreator
private UserItems(Map<String, Item> userItems) {
    this.userItems = userItems;
}

@JsonValue
@SuppressWarnings("unused")
Map<String, Item> getUserItems() {
    return Collections.unmodifiableMap(items);
}

...
public UserItems after(ItemAddedEvent itemEvent) {
    Item item = Item.from(itemEvent);

    Map<String, Item> newItems = new HashMap<>(items);
    newItems.put(itemEvent.getItemName(), item);
    return new UserItems(newItems);
}

Kafka主题

application-user-UserItems


这个源主题没有问题。它的保留设置为最大值,所有消息始终存在。


application-user-UserItems-store-changelog(压缩。具有默认配置 - 未更改保留或其他内容)


这里是奇怪的部分。我们可以观察到在某些用户的更改日志中,值丢失了:

Offset | Partition |   Key   |  Value  
...........................................  
...  
320         0        "User1" : {"ItemName1":{"param":"foo"}}  
325         0        "User1" : {"ItemName1":{"param":"foo"},"ItemName2":{"param":"bar"}}  
1056        0        "User1" : {"ItemName3":{"param":"zyx"}}  
...    

我们可以看到,首先消息被正确地聚合:处理了Item1,然后将Item2应用于聚合。但是经过一段时间——可能是几天——另一个事件正在处理——基础“User1”键下的值似乎已经丢失,只有Item3存在。
在应用程序中,用户无法一次性删除所有项目并添加另一个项目,用户只能逐个添加或删除项目。因此,如果他删除了ItemName1和ItemName2,然后添加了ItemName3,我们期望在更改日志中看到以下内容:
Offset | Partition |   Key   |  Value   
..............................................  
...  
320         0        "User1" : {"ItemName1":{"param":"foo"}}   
325         0        "User1" : {"ItemName1":{"param":"foo"},"ItemName2":{"param":"bar"}}   
1054        0        "User1" : {"ItemName2":{"param":"bar"}}   
1055        0        "User1" : {}   
1056        0        "User1" : {"ItemName3":{"param":"zyx"}}  

结论

起初我们认为这与变更日志主题保留有关(但我们检查过,只启用了压缩)。

application-user-UserItems-store-changelog  PartitionCount:3    ReplicationFactor:1 Configs:cleanup.policy=compact,max.message.bytes=104857600   
    Topic: application-user-UserItems-store-changelog   Partition: 0    Leader: 0   Replicas: 0 Isr: 0   
    Topic: application-user-UserItems-store-changelog   Partition: 1    Leader: 2   Replicas: 2 Isr: 2   
    Topic: application-user-UserItems-store-changelog   Partition: 2    Leader: 1   Replicas: 1 Isr: 

任何想法或提示将不胜感激。干杯!


很难说。这可能与Serde有关吗? put(key,null)被解释为删除 - 因此,如果由于任何原因在序列化期间返回了null,则记录将被删除。 - Matthias J. Sax
如果在读取时反序列化返回了 null,我们也可以采取相反的方式,通过提供的 Initializer 重新初始化聚合。 - Matthias J. Sax
请注意,默认情况下,缓存可能会“吞掉”对changelog主题的某些写入操作,因此即使是未压缩的部分也可能无法显示所有写入操作。 - Matthias J. Sax
谢谢你的提示。我们扩展了序列化器以记录所有可能删除值的空值,但这不是原因 - 根据日志,它并没有发生。我们正在进一步调查。 - AndreyB
@MatthiasJ.Sax FYI - AndreyB
显示剩余2条评论
1个回答

0
我曾经遇到过你描述的同样问题,看起来这个问题与你的kafka-streams配置有关。 你提到你的“source”主题有以下配置:

每个主题3个代理,每个代理3个分区,副本因子为2

请确保将以下属性(replication.factor)至少设置为2(默认值为1)添加到您的kafka流配置中。
StreamsConfig.REPLICATION_FACTOR_CONFIG [replication.factor]

这也与您所写的相符(更改日志主题的复制因子设置为1)

application-user-UserItems-store-changelog PartitionCount:3 ReplicationFactor:1 Configs:cleanup.policy=compact,max.message.bytes=104857600

因此,我的假设是由于经纪人故障而丢失数据(但由于源主题的复制因子为2,数据应该保留,因此您可以重新处理并填充更改日志主题)。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接