想要使用高级消费者API实现延迟消费者
主要思路:
- 按键生成消息(每个消息包含创建时间戳),这确保每个分区的消息都按照生成时间排序。
- auto.commit.enable=false(将在每个消息处理后明确提交)
- 消费一条消息
- 检查消息时间戳并检查是否已经过了足够的时间
- 处理消息(此操作永远不会失败)
提交1个偏移量
while (it.hasNext()) {
val msg = it.next().message()
//checks timestamp in msg to see delay period exceeded
while (!delayedPeriodPassed(msg)) {
waitSomeTime() //Thread.sleep or something....
}
//certain that the msg was delayed and can now be handled
Try { process(msg) } //the msg process will never fail the consumer
consumer.commitOffsets //commit each msg
}
对于这个实现,存在一些问题:
- 每次提交位移可能会减慢ZK的速度
- 消费者.commitOffsets是否会抛出异常?如果是,我将会重复消费同一条消息(可以通过幂等消息来解决)
- 在不提交位移的情况下等待很长时间会存在问题,例如延迟期为24小时,会从迭代器中获取下一个消息,等待24小时后处理并提交(ZK会话超时?)
- 如何在不提交新的位移的情况下保持ZK会话的活性?(设置hive.zookeeper.session.timeout.ms可以解决未识别的死亡消费者)
- 还有其他我忽略的问题吗?
谢谢!
zookeeper.session.timeout.ms
的参数,默认设置为6秒,但将其设置为极端值似乎是对技术的滥用(zk将无法跟踪哪些消费者实际上因此而死亡)。 - om-nom-nom