避免Apache Kafka消费者中重复消息的有效策略

76

我已经学习了一个月的Apache Kafka了。但现在我卡在了一个点上。我的使用情况是,在不同的机器上运行两个或更多的消费者进程。我进行了一些测试,向Kafka服务器发布了10,000条消息。然后在处理这些消息时,我关闭了其中一个消费者进程并重新启动了它。消费者将已处理的消息写入文件中。因此,在消耗完成后,文件显示超过10k条消息。所以有些消息被重复了。

在消费者进程中,我已禁用自动提交。消费者手动按批次提交偏移量。例如,如果已将100条消息写入文件,则消费者会提交偏移量。当单个消费者进程运行并崩溃并恢复时,通过这种方式避免了重复。但是当运行多个消费者并且其中一个崩溃并恢复时,它会将重复的消息写入文件。

有没有有效的策略来避免这些重复的消息?


10
单一消费者情况下,我不明白如何避免重复问题。你能帮我理解一下吗? - RaGe
6个回答

54
简而言之,不行。
您正在寻找的是准确一次性处理。虽然看起来很可行,但永远不能依赖它,因为总会有注意事项。
即使要尝试避免重复,您也需要使用简单的消费者。这种方法的工作原理是对于每个消费者,从某个分区消耗消息时,将已消耗消息的分区和偏移量写入磁盘。当消费者在故障后重新启动时,从磁盘中读取每个分区的上次消费偏移量。
但是,即使使用此模式,消费者也无法保证不会在故障后重新处理消息。如果消费者消耗消息,然后在偏移量刷新到磁盘之前失败怎么办?如果在处理消息之前写入偏移量,那么如果在实际处理消息之前失败,怎么办?即使在每条消息后将偏移量提交给ZooKeeper,这个问题仍然存在。
然而,在某些情况下,准确一次性处理更容易实现,但仅适用于特定用例。这只需要在与单元应用程序输出相同的位置存储偏移量即可。例如,如果您编写了一个计算消息计数的消费者,通过在每个计数中存储上次计数偏移量,您可以保证偏移量与消费者状态同时存储。当然,为了确保准确一次处理,这将要求您每次仅消耗一条消息并更新每个消息的状态一次,这对于大多数Kafka消费者应用程序来说是完全不切实际的。基于性能原因,Kafka以批次方式消耗消息。
通常,如果您简单地设计为幂等,则可以更好地利用时间并使应用程序更加可靠。

相较于启用自动提交,我们通过“仅一次”场景获得了什么实际好处?在哪些情况下,这将有所帮助?就像我的情况一样,我将有多个消费者在不同的机器上运行,从具有多个分区的相同主题中消费数据,并且我想消除漏掉消息的可能性,并减少重新平衡期间重复消息的数量。 - john
在我的情况下,接收重复消息是可以的,因为我的系统可以处理它,但我绝不能丢失数据,所以想知道通过手动管理偏移量(在磁盘上或某个数据库中)是否会带来任何好处。 - john

42
以下是关于“仅一次性(exactly-once)”的主题,Kafka FAQ 给出的内容:

如何从Kafka获得精确一次的消息传递?

精确一次语义有两个部分:在数据生产过程中避免重复以及在数据消费过程中避免重复。

有两种方法可以在数据生产过程中获得精确一次语义:

  • 对于每个分区使用单个写入器,每次出现网络错误时检查该分区中的最后一条消息,以查看上次写入是否成功
  • 在消息中包含主键(UUID或其他内容),并在消费者端进行去重。

如果您执行其中一项操作,则Kafka托管的日志将不会有重复。但是,无重复读取取决于消费者的某些协作。如果消费者定期检查其位置,则如果它失败并重新启动,则将从检查点位置重新启动。因此,如果数据输出和检查点不是原子写入的,则也可能在这里获得重复。这个问题特定于您的存储系统。例如,如果您正在使用数据库,则可以在事务中提交这些内容。LinkedIn编写的HDFS加载程序Camus为Hadoop加载执行了类似的操作。另一种不需要事务的选择是将偏移量与已加载的数据一起存储,并使用主题/分区/偏移量组合进行去重。

我认为有两个改进可以使这个过程更容易:

  • 可以通过可选地在服务器上集成对此的支持来自动执行生产者幂等性,并且成本更低。
  • 现有的高级消费者没有暴露偏移量的更多细粒度控制(例如,重置您的位置)。我们将很快解决这个问题

在我的情况下,接收到重复的消息是可以接受的,因为我的系统可以处理它,但我绝不能丢失任何数据,所以想知道通过手动管理磁盘上或某个数据库中的偏移量是否会带来任何好处。 - john

27

我同意RaGe对消费者端的去重处理。我们使用Redis对Kafka消息进行去重。

假设Message类有一个名为“uniqId”的成员变量,由生产者填充,保证唯一性。我们使用一个12位长度的随机字符串。(正则表达式为'^[A-Za-z0-9]{12}$')

消费者端使用Redis的SETNX方法进行去重,使用EXPIRE自动清理过期的键值。示例代码:

Message msg = ... // eg. ConsumerIterator.next().message().fromJson();
Jedis jedis = ... // eg. JedisPool.getResource();
String key = "SPOUT:" + msg.uniqId; // prefix name at will
String val = Long.toString(System.currentTimeMillis());
long rsps = jedis.setnx(key, val);
if (rsps <= 0) {
    log.warn("kafka dup: {}", msg.toJson()); // and other logic
} else {
    jedis.expire(key, 7200); // 2 hours is ok for production environment;
}

当Kafka(版本0.8.x)发生故障时,上述代码多次检测到重复消息。但通过我们的输入/输出平衡审计日志,没有消息丢失或重复。


这种方法在重试的情况下不起作用,您会将它们视为去重,而实际上它们应该是重试。 - rayman
1
如果在jedis.setnx()命令执行后,消费者崩溃/网络失败等,在完成其处理任务之前发生了这种情况怎么办?我想我们应该承担这个小风险吧? - user1955934
你可以使用 Redis 事务。 - oputyk

11

现在在Kafka中有一个相对较新的"事务API",可以让您在处理流时实现精确一次处理。使用事务API,只要您的系统余下部分设计为幂等,就可以构建幂等性。请参见https://www.baeldung.com/kafka-exactly-once


3
只有生产者使用事务API时,消费者才能从此模式中受益。 - Traycho Ivanov

4

无论生产者端做了什么,我们仍然认为从kafka确保恰好一次传递的最好方法是在消费者端处理:

  1. 使用uuid作为Kafka消息键将消息生产到T1主题中
  2. 消费者从T1读取消息,并将其用uuid作为行键写入Hbase
  3. 使用相同的行键从Hbase读取并写入另一个主题T2
  4. 最后的消费者从主题T2中实际消费

0
其他替代方案包括:
  1. 将带有唯一键的消息发送到测试主题
  2. 在消费者端,在处理成功后,将带有唯一键的条目推送到数据库。在消费者ID开始处理之前,检查数据库中的唯一键条目。如果存在,则表示消费者已经处理了该消息,否则执行相应操作。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接