我们发现Java Kafka消费者出现了意外的重新平衡,如下所述。这些问题是否有人熟悉?有没有关于API或调试技术的提示来确定重新平衡的原因?
有时候
我们使用
重新平衡是间歇性的,很难复现。它们发生在每秒10,000到80,000个消息的速率下。我们在日志中没有看到明显的错误。
我们的读取循环非常简单 - 基本上是“当运行时,使用超时和错误处理进行轮询,然后将接收到的消息加入队列”。
人们问了一些相关的好问题,但答案并没有帮助我们: 配置:
- 两个进程正在读取一个主题。有时主题上的所有分区都会重新平衡到单个读取进程。重新启动两个进程后,分区会平均分配。
- 两个进程正在读取一个主题。有时一长串的重新平衡会将分区从读取者反弹到读取者。我们对消费者进行暂停/恢复以进行背压,应该可以防止这种情况。
- 两个进程正在读取一个主题。有时候当看起来两个进程都在正常读取时会发生重新平衡。之后,读取工作正常,但处理中出现了小问题。
有时候
poll()
会卡住(超过超时时间),我们使用wakeup()
和close()
,然后创建新的消费者。有时协调器心跳线程在消费者关闭后仍然运行(我们见过成千上万次)。时间似乎与重新平衡无关,因此重新平衡似乎是一个单独的问题,但也许心跳正在遇到未记录的网络问题。我们使用
ConsumerRebalanceListener
来记录和处理某些重新平衡,但Kafka API似乎不会公开有关重新平衡原因的数据。重新平衡是间歇性的,很难复现。它们发生在每秒10,000到80,000个消息的速率下。我们在日志中没有看到明显的错误。
我们的读取循环非常简单 - 基本上是“当运行时,使用超时和错误处理进行轮询,然后将接收到的消息加入队列”。
人们问了一些相关的好问题,但答案并没有帮助我们: 配置:
- Kafka 0.10.1.0 (我们已经开始尝试1.0.0,但还没有测试结果)
- Java 8代理和客户端
- 2个代理,1个Zookeeper,稳定运行的进程并且没有增加
- 5个主题,其中2个主题有一定的活动量。重新平衡发生在繁忙的主题(主题“ A”)上。
- 主题A有16个分区和复制2,并且是在消费者启动之前创建的。
- 一个进程写入主题A; 两个进程从主题A读取。
- 每个读取进程运行16个消费者。当16个分区平均平衡时,一些消费者处于空闲状态。
- 消费者线程在轮询之间做很少的工作。消息处理在与消费者不同的线程上异步进行。
- 所有主题A的消费者都在同一个消费者组中。
KafkaConsumer.poll()
的超时时间为1000毫秒。影响重新平衡的配置是:
max.poll.interval.ms = 50000
max.poll.records = 100
request.timeout.ms = 40000
session.timeout.ms = 20000
我们对这些使用默认值:
heartbeat.interval.ms = 3000
- (broker)
group.max.session.timeout.ms = 300000
- (broker)
group.min.session.timeout.ms = 6000