Kafka - 使用高级消费者实现延迟队列

30

想要使用高级消费者API实现延迟消费者

主要思路:

  • 按键生成消息(每个消息包含创建时间戳),这确保每个分区的消息都按照生成时间排序。
  • auto.commit.enable=false(将在每个消息处理后明确提交)
  • 消费一条消息
  • 检查消息时间戳并检查是否已经过了足够的时间
  • 处理消息(此操作永远不会失败)
  • 提交1个偏移量

while (it.hasNext()) {
  val msg = it.next().message()
  //checks timestamp in msg to see delay period exceeded
  while (!delayedPeriodPassed(msg)) { 
     waitSomeTime() //Thread.sleep or something....
  }
  //certain that the msg was delayed and can now be handled
  Try { process(msg) } //the msg process will never fail the consumer
  consumer.commitOffsets //commit each msg
}

对于这个实现,存在一些问题:

  1. 每次提交位移可能会减慢ZK的速度
  2. 消费者.commitOffsets是否会抛出异常?如果是,我将会重复消费同一条消息(可以通过幂等消息来解决)
  3. 在不提交位移的情况下等待很长时间会存在问题,例如延迟期为24小时,会从迭代器中获取下一个消息,等待24小时后处理并提交(ZK会话超时?)
  4. 如何在不提交新的位移的情况下保持ZK会话的活性?(设置hive.zookeeper.session.timeout.ms可以解决未识别的死亡消费者)
  5. 还有其他我忽略的问题吗?

谢谢!


  1. 从0.8.2版本开始,您可以向kafka提交偏移量(尽管zk仍然被广泛使用)。
  2. 是的,这是一个根本性的问题(考虑到“精确一次处理”)。
  3. 如果您的zk会话过期了(如果您的消费者组中有很多消费者,则消息可能会从原始消费者重新平衡)。坦率地说,如果您每天只有1条消息,那么kafka似乎并不适合。
- om-nom-nom
我不打算在24小时内运行。我会检查它的提交时间(这是消息的一部分),并检查当前时间,以查看是否已经过去了24小时。这样它就不会像病毒那样“传播”,而是会被消费掉。我该如何设置会话不过期? - Nimrod007
1
有一个名为zookeeper.session.timeout.ms的参数,默认设置为6秒,但将其设置为极端值似乎是对技术的滥用(zk将无法跟踪哪些消费者实际上因此而死亡)。 - om-nom-nom
如果消息不应该在长达24小时的时间内被处理,为什么不在将其放入某些持久存储后立即消耗该消息呢?然后有一个后台任务(可能是一个Akka Actor,因为您正在使用Scala)来检查是否应基于经过的时间处理任何新消息。 - Emil L
@EmilH - 为了不引入更多的复杂性到解决方案中,DB写操作可能会在从kafka消费后失败,akka(jvm)在邮箱中可能有未处理的消息而死亡。但这是一个有效的解决方案。 - Nimrod007
显示剩余4条评论
5个回答

25

有一种方法是使用不同的主题,将所有需要延迟的消息都推送到该主题中。如果所有延迟的消息应在相同的时间延迟后处理,则这将非常简单:

while(it.hasNext()) {
    val message = it.next().message()
    
    if(shouldBeDelayed(message)) {
        val delay = 24 hours
        val delayTo = getCurrentTime() + delay
        putMessageOnDelayedQueue(message, delay, delayTo)
    }
    else {
       process(message)
    }

    consumer.commitOffset()
}

所有普通信息现在都会尽快处理,而需要延迟的信息则被放在另一个主题下。

好处是我们知道延迟主题头部的消息是应该首先处理的,因为它的 delayTo 值最小。因此,我们可以设置另一个消费者来读取头部消息,检查时间戳是否已过期,如果是,则处理消息并提交偏移量。如果没有过期,则不提交偏移量,而是休眠到那个时间:

while(it.hasNext()) {
    val delayedMessage = it.peek().message()
    if(delayedMessage.delayTo < getCurrentTime()) {
        val readMessage = it.next().message
        process(readMessage.originalMessage)
        consumer.commitOffset()
    } else {
        delayProcessingUntil(delayedMessage.delayTo)
    }
}

如果存在不同的延迟时间,您可以将主题按延迟时间进行分区(例如24小时、12小时、6小时)。如果延迟时间比这更动态,则会变得更加复杂。您可以通过引入两个延迟主题来解决。从延迟主题 A 中读取所有消息,并处理其中delayTo值已过期的所有消息。在其他消息中,只需找到最接近delayTo的那一条,然后将它们放在主题B上。等待直到最接近的一个应该被处理并反向执行所有操作,即从主题B中处理消息,并将那些尚未被处理的消息放回主题A

回答您具体的问题(其中一些已在您的问题评论中得到了解答)

  1. 提交每个偏移量可能会使ZK变慢

您可以考虑切换到在Kafka中存储偏移量(从0.8.2开始可用的一项功能,请查看消费者配置中的offsets.storage属性)

  1. consumer.commitOffsets是否会抛出异常? 如果是,我将重复消费相同的消息(可以使用幂等消息解决)

我相信它可以,例如如果它无法与偏移量存储通信。然而,使用幂等消息可以解决此问题,正如您所说的那样。

  1. 长时间等待而不提交偏移量会有问题,例如延迟期为24小时,将从迭代器中获取下一个消息,休眠24小时,处理并提交(ZK会话超时?)

除非消息本身的处理需要超过会话超时时间,否则上述解决方案不会出现问题。

  1. 如何在不提交新偏移量的情况下保持ZK会话活动状态? (设置较长的zookeeper.session.timeout.ms可能会导致死亡的消费者而不认识它)

再次使用上述方法,您不需要设置长会话超时时间。

  1. 我是否忽略了其他问题?

总是有其他问题的;)


感谢您提供详细的答案。为什么要使用 it.peek().message() 而不是 it.next()? - Nimrod007
ConsumerIterator.peek() 继承自 IteratorTemplate,不会改变 ConsumerIterator 中的任何内容。它将始终给出相同的值,直到调用 ConsumerIterator.next() 方法。请参考:https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/IteratorTemplate.scala#L46 和 https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala#L45。简而言之,它不会推进迭代器。 - Emil L
我找不到ConsumerIterator类。它在kafka的最新版本中还存在吗? - alexlipa
@alexlipa 那个特定的类似乎已经被移除了。我能看到它出现在的最新版本是0.7,而目前发布的版本已经更新到2.0,并且中间有很多更新。 - Emil L
3
将监听线程休眠直到延迟时间过去并不是一个好主意。这样会很快耗尽所有的监听线程。 - Rahul kalivaradarajalu

6

使用Tibco EMS或其他JMS队列。它们具有内置的重试延迟。对于你正在做的事情,Kafka可能不是正确的设计选择。


1
这是正确的答案。我很惊讶有多少人认为Kafka是一个通用的消息队列。 - Krzysztof Tomaszewski

2
我建议在这种情况下选择另一种方法。在消费者的主线程中等待时间是没有意义的,这将违反队列使用的反模式。从概念上讲,您需要尽快处理消息,并使队列处于低负载状态。
相反,我建议使用调度程序为您需要延迟的每个消息安排作业。这样,您就可以处理队列并创建异步作业,这些作业将在预定的时间点触发。
使用此技术的缺点是它对保存计划作业的JVM状态敏感。如果该JVM失败,则会丢失计划作业,您不知道任务是否已执行。
然而,有一些调度程序实现可以配置为在群集环境中运行,从而使您免受JVM崩溃的影响。
请查看此Java调度框架:http://www.quartz-scheduler.org/

“安排任务”很难做...这增加了复杂性,但最终会奏效。我正在寻找一些简单的东西。 - Nimrod007
1
使用Tibco EMS或其他JMS队列。它们内置了重试延迟。Kafka可能不是您正在进行的正确设计选择。 - Dhyan
@Nimrod007,我同意。 - softwarevamp

2

0

基于计划的键值列表或其Redis替代品可能是最好的方法。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接