Kafka消费者手动偏移提交。

6

我的一个用例包括使用数据,执行一些操作,并将其生成到新主题。

我正在使用npm库https://www.npmjs.com/package/kafkajs

我想在成功操作后手动提交偏移量以避免任何数据丢失。 我正在使用autoCommit: false来避免消费后自动提交数据。

这是手动提交偏移量的代码:

consumer.commitOffsets([
  { topic: 'topic-A', partition: 0, offset: '1' }
])

我在某处读到,如果我们有意地提交每个偏移量(在消费后立即提交偏移量),那么这将给代理带来负担,不利于操作。

我需要Kafka专家的建议,针对我的上述用例,建议最佳方法以避免任何数据丢失?请给予建议。


我从 https://github.com/tulios/kafkajs/issues/540 找到了一些解决方法。 - Alongkorn
1个回答

1
为了手动处理提交,以下是代码。
await consumer.run({
  autoCommit: false,
  eachMessage: async ({ topic, partition, message }) => {
    ...
    await consumer.commitOffsets([{ topic, partition, offset: (Number(message.offset) + 1).toString() }]);
  },
});

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接