Kafka/RabbitMQ中的消息确认

3

我们目前有一个可用的RabbitMQ实现,由于数据量较大,我们计划切换到Kafka。

我在一个问题上有疑虑。

在RabbitMQ中,当消费者从队列中消费消息时,消息会进入不同的阶段,即unacked阶段。客户端/消费者需要一些时间来处理消息,处理成功后,它会向队列发送确认信息,然后消息被从队列中删除。如果处理失败,在一定时间内,如果队列没有收到确认信息,则将消息追加到队列末尾。通过这种方式,我们不会丢失任何消息。

根据我对Kafka的一点了解,例如,如果未能成功处理消息100,则偏移量不会增加,但是如果成功处理消息101,则会增加偏移量。所以我丢失了消息100。

是否有一种方法可以保证不会丢失任何消息。


您需要实现一个死信队列(Dead Letter Queue,DLQ)。 - Giorgos Myrianthous
为什么不将Solace视为Rabbit的更接近的消息替代品?它具有类似的API和语义(例如每个消息的确认),但处理更好的数据量。 - Aaron
4个回答

2

Kafka不会从话题中删除消息,除非它达到log.retention.byteslog.retention.hourslog.retention.minuteslog.retention.ms配置之一。因此,如果偏移量增加,则不会丢失先前的消息,您只需更改偏移量以达到所需位置即可。

最初的回答已经涵盖了Kafka如何保留消息和如何避免数据丢失的问题。


管理员,我的消费者没有跟踪记录,我们有数百个客户在消费RabbitMQ的消息。他们会消费并确认每条消息。RabitMQ负责删除成功消费的消息,并将未成功消费的消息重新发布到队列中。现在,如果某个客户端在没有处理一条消息的情况下向前移动了偏移量,该客户端如何再次消费相同的消息,而不需要在客户端端进行更改。 - Koushik Paul

1
我也遇到了同样的问题。简单来说,RabbitMQ会计算每个已发布但未被消费和已发布、已消费但未被确认的消息数量。而Kafka则不会,因此你不能直接使用它,需要自己实现。不过有一些可选方案,例如使用kmq,性能会降低50%左右,可以看看。

https://softwaremill.com/kafka-with-selective-acknowledgments-performance/


0

除非您轮询新消息,否则不会增加您的消息偏移量。因此,您必须关注重新处理您的消息。

如果您想将数据处理结果存储到Kafka集群中,可以使用Kafka的事务特性。这样,您可以支持精确一次交付。所有更改都将被保存,或者它们都不会被存储。

另一种方法是使您的处理场景具有幂等性。您将为Kafka中的每个消息分配一个唯一的ID。当您处理消息时,将ID存储在数据库中。在崩溃后,通过查看数据库来检查您的消息ID是否已经被处理。


0

你应该了解一下Kafka中消息消费的工作原理。这里是官方Kafka文档中消费者部分的链接:https://kafka.apache.org/documentation/#theconsumer

基本上,在Kafka中,只有足够的时间过去后,消息才会被删除,这是使用log.retention.hourslog.retention.minuteslog.retention.ms进行配置的,就像@Amin所说的那样。

在Kafka中,任意数量的消费者可以从任何主题开始消费消息,无论其他消费者是否已经从同一主题消费。Kafka使用存储在Kafka本身中的偏移量来跟踪每个消费者在每个主题/分区上的位置。因此,如果您的消费者需要消费第100条消息,就像您在问题中描述的那样,您可以简单地“倒回”到所需的消息,并再次开始正常消费。无论您以前是否已经消费过它,或者其他消费者是否正在从该主题读取,都没有关系。

来自官方Kafka文档:

消费者可以有意地将偏移量倒回到旧值,并重新消费数据。这违反了队列的常规约定,但事实证明这对许多消费者来说是一个重要的特性。例如,如果消费者代码存在错误,并且在消费了一些消息后发现该错误,则一旦修复了该错误,消费者便可以重新消费那些消息。

1
感谢 @mjuarez,我的情况有点不同,让我解释一下。所以,每个主题只有一个消费者/读者,如果有多个,则是为了并行处理,因此它们将读取/消费不同的消息。但是,当消费者/读者无法处理消息并转发到下一个时,消息可能仍然存在于kafka中,但它将不断增加偏移量,仅读取新消息。因此,失败的那个消息将不会再次被处理。对于RabbitMQ来说很容易,因为它会将消息移回队列。Kafka每个消费者保留一个偏移量,而RabbitMQ则是每个消息。 - Koushik Paul
1
只需将客户端配置为每次读取一条kafka消息,并在成功处理每条消息后提交偏移量,就可以实现与rabbitmq类似的行为(仅在使用主题分区进行分片和多个消费者共享一个消费者组时更快)。它会比Kafka客户端以批处理方式消费消息的正常模式慢一些,但仍然更快。 - Hans Jespersen
此外,Kafka客户端通常针对每个消费者组的每个分区保留一个偏移量。因此,如果一个主题有16个分区和单个消费者组中的4个消费者,则会保留16个偏移量(每个分区一个)。 - Hans Jespersen
@Hans 这是一个可行的解决方案,但最终会使进程变慢。有没有其他的选择,否则我们只能坚持使用旧版的RabbitMq。 - Koushik Paul
1
另一种选择是轮询 Kafka 消息的批次并管理自己的偏移量。如果您必须跳过某个消息,则将其偏移量写入死信队列或外部非 Kafka 系统以供稍后重新处理。 - Hans Jespersen

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接