方法1:
只有在您能够估计消息处理时间的情况下,才适用此方法,否则不建议使用。
问题:此方法的主要问题是保持消费者的活动状态。如果您在调用poll()之前等待消息完全处理完成,则必须确保您的消费者在调用poll()之前保持活动状态,因为Kafka维护了一个名为“session.timeout.ms”的属性。Kafka代理/集群对该属性的值采取行动,如果消费者无法在“session.timeout.ms”时间段内再次调用poll(),则代理将标记消费者死亡并将其踢出。现在,当消费者完成消息处理并再次调用poll()时,它被视为新加入者,并且会再次从与之前相同的偏移量开始给出记录集。考虑到这种情况,消费者将陷入永久循环中,永远无法处理其偏移量。
可能的解决方案1:要使用此方法,您需要一个好的以下属性值:“session.timeout.ms”,具有以下副作用:
1:值过低:如上所述,消费者将被标记为死亡,并且永远无法处理其偏移量,但是消息将被处理,但每次完成消息时,它都会再次获取先前的消息+新消息。
2:值过高:代理在检测到消费者的真正故障时会非常晚,这将导致记录重复,并影响总吞吐量。
可能的解决方案2:(仅适用于版本0.10.1.x) Kafka在发布(0.10.1.0)中的官方修复方法。在此方法中,引入了两个值得注意的实体:一个新属性“max.poll.interval.ms”,该属性设置客户端调用poll()之间的最大延迟时间,并且负责保持消费者活动状态的后台线程。因此,在一种情况下,当消费者调用方法poll()并忙于消息处理时,内部后台线程将保持心跳存活,因此消费者将保持活动状态。但是,此内部后台线程本身将保持活动状态,直到属性“max.poll.interval.ms”的超时值仍然有效。因此,如果消费者在“max.poll.interval.ms”时间段值内没有调用poll(),则该线程将等待消费者,并发送离开请求并自行终止。
再次强调这个解决方案中的棘手部分是找到合适的此属性值:“max.poll.interval.ms”(非常重要,这个时间将是后台线程在不需要显式调用poll()的情况下保持心跳存活的时间)。
方法2:
使用工作线程是一个好主意,但是您需要维护内部队列或接收到的消息的验证,这可能很复杂,而且您需要使用手动提交来代替自动提交。有关提交的更多信息,请参见
此处并搜索标题“提交和偏移量”。
问题: 在这种方法中,主要问题是跟踪已接收和成功处理的消息。当您的消费者接收到消息时,它将把消息传递给相应的工作线程,并提交偏移量以继续接收更多消息。在此过程中,您必须注意以下问题:
- 如果已接收并提交了偏移量但由于某些原因工作线程无法处理该消息,则该怎么办?
- 如果消费者接收到消息但没有空闲的工作线程来处理该怎么办?
解决方案: 可以采用不同的方法来解决上述问题,其中一种方法是使用内部队列来保存消息和手动提交,只有当工作线程报告消息成功处理时才会发送提交。但需要非常小心地实现,因为它可能导致代码复杂以及内存管理或线程问题。
建议: 根据您的要求,您可以使用一种方法或另一种方法,并实现可能出现的问题的固定。然而,我建议更强大的解决方案是使用分区暂停/恢复。以非常抽象的方式,您的消费者应执行以下步骤:
1:轮询()以获取消息。
2:暂停所有相关主题/分区。
3:将消息分配给工作线程并等待其处理。
4:不断调用轮询(),但是由于已暂停分区,因此在消费者保持活动状态的同时不会收到额外的消息。 (确保在此时不注册新主题)
5:如果所有工作线程都应报告消息处理成功/失败,则相应地提交偏移量。
6:恢复所有分区。
注:根据您的情况和要求,可能存在更好的方法或其他解决方案。这只是一个想法或可能的解决方案。