Kafka 消费者 - 具有更高优先级的主题

3
我正在使用Kafka Consumer从多个主题中读取数据,我希望其中一个主题具有更高的优先级。处理需要很长时间,并且通常有许多(低优先级)主题中的消息,但我希望来自另一个主题的消息尽快被处理。
这与Kafka是否支持主题或消息优先级类似,但此问题使用旧API。
在新的API(0.10.1.1)中,有以下方法:
KafkaConsumer::pause(Collection)
KafkaConsumer::resume(Collection)

但我不清楚如何有效地检测高优先级主题中是否有新消息,并且需要暂停从其他主题进行消费。

有什么想法/示例吗?


1
你可以检查你正在监控的分区的endOffsets是否大于这些分区的最后提交offsets。具体实现方式可能会有所不同,但这样做可以让你知道在轮询之前是否有更多的消息需要消费。 - dawsaw
请看一下这个,它可能是你正在寻找的内容: https://stackoverflow.com/a/66013251/4602706 - Marco Vargas
2个回答

7

最终,我解决了这个问题,就像dawsaw建议的那样-在处理循环中,我为我从所有主题/分区读取的内容存储:

  • beginningOffsets
  • endOffsets
  • committed - 我不能使用position,因为我订阅的是主题,而不是分区。

每当任何优先级主题的 (endOffset-committed) > 0, 我会暂停非优先级主题并在所有优先级主题的(endOffset-committed) == 0 后再次恢复它们。


请问您能分享一下解决这个问题的策略吗?假设我们有(总共10 Gbs)低优先级消息和少量高优先级消息。我们有多个消费者和多个生产者。即使我们暂停了消费者,为了实现您的想法,我们还需要暂停所有其他主题的生产者。对吗?您是否有相关经验,因为在100个服务和数十个主题的生态系统中,这似乎几乎是不可能的?- 是的,我已经阅读了您关于此事的其他相关问题。谢谢。 - JSBach
不需要暂停任何生产者 - 这个想法是你有一个单一的消费者订阅了几个主题(其中一些主题是高优先级的,其他是普通优先级的)。在轮询新消息之前,您需要检查高优先级主题的滞后情况。如果这些滞后中有任何一个非零,则意味着您需要暂停订阅普通优先级主题,以免消费者“偷走”时间。在处理完所有来自高优先级主题的消息后,您可以再次恢复普通优先级的主题。 - miran
谢谢。我不能完全反驳。但对于更大的系统来说,这样做会有很多问题。一旦大量数据的大门打开,我将不得不时不时地检查是否在使用低优先级队列时浪费资源。为什么要这样做呢?对吧。无论如何,再次感谢。 - JSBach
@mian,您是否知道Python客户端是否有类似的Kafka实现? - activelearner
我相信Python客户端提供了与Java客户端相同的API - 因此您绝对可以实现它... - miran
@miran 我很好奇你是否仍在使用这种方法。你有没有想过它的可扩展性?我想实现类似的东西,但是不想将阈值设置为0,而是想将其设置为可配置的值...可能设置为更高的数字,比如500或1000。我还想考虑使用3个主题:高、中、低优先级。我只是担心暂停/恢复操作的性能问题。你有什么想法吗? - grt3kl

3

我想你可以结合position()和committed()方法来实现。position()方法获取下一个将被获取的记录的偏移量,而committed()方法获取给定分区的最后一个已提交的偏移量(如文档中所述)。 在轮询低优先级之前,您可以检查高优先级的position()和committed()。如果position()高于committed(),则可以暂停低优先级并在高优先级上进行poll(),然后恢复低优先级。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接