在我们的
尝试找出 kafkajs 是否提供任何有趣的内容,但没有发现什么特别有趣的信息(payload 中最有趣的信息是:
问题:
有什么想法可以记录由
我是否应该考虑实现自己的统计监视器来收集 node 应用程序中使用
新细节 1:
我可以像下一个一样获取信息:
nest.js
应用程序中,我们使用 kafkajs 客户端处理 kafka 消息。我们需要监控统计信息,其中一个指标是 lag
。尝试找出 kafkajs 是否提供任何有趣的内容,但没有发现什么特别有趣的信息(payload 中最有趣的信息是:
timestamp
、offset
、batchContext.firstOffset
、batchContext.firstTimestamp
和 batchContext.maxTimestamp
)。问题:
有什么想法可以记录由
kafkajs
提供的 lag
值和其他统计信息吗?我是否应该考虑实现自己的统计监视器来收集 node 应用程序中使用
kafka.js
客户端所需的信息?新细节 1:
根据文档,我可以获取batch.highWatermark
,其中
batch.highWatermark
是主题分区中最后一个已提交的偏移量。它对于计算滞后可能有用。
尝试中...
await consumer.run({
eachBatchAutoResolve: true,
eachBatch: async (data) => {
console.log('Received data.batch.messages: ', data.batch.messages)
console.log('Received data.batch.highWatermark: ', data.batch.highWatermark)
},
})
我可以像下一个一样获取信息:
Received data.batch.messages: [
{
magicByte: 2,
attributes: 0,
timestamp: '1628877419958',
offset: '144',
key: null,
value: <Buffer 68 65 6c 6c 6f 21>,
headers: {},
isControlRecord: false,
batchContext: {
firstOffset: '144',
firstTimestamp: '1628877419958',
partitionLeaderEpoch: 0,
inTransaction: false,
isControlBatch: false,
lastOffsetDelta: 2,
producerId: '-1',
producerEpoch: 0,
firstSequence: 0,
maxTimestamp: '1628877419958',
timestampType: 0,
magicByte: 2
}
},
{
magicByte: 2,
attributes: 0,
timestamp: '1628877419958',
offset: '145',
key: null,
value: <Buffer 6f 74 68 65 72 20 6d 65 73 73 61 67 65>,
headers: {},
isControlRecord: false,
batchContext: {
firstOffset: '144',
firstTimestamp: '1628877419958',
partitionLeaderEpoch: 0,
inTransaction: false,
isControlBatch: false,
lastOffsetDelta: 2,
producerId: '-1',
producerEpoch: 0,
firstSequence: 0,
maxTimestamp: '1628877419958',
timestampType: 0,
magicByte: 2
}
},
{
magicByte: 2,
attributes: 0,
timestamp: '1628877419958',
offset: '146',
key: null,
value: <Buffer 6d 6f 72 65 20 6d 65 73 73 61 67 65 73>,
headers: {},
isControlRecord: false,
batchContext: {
firstOffset: '144',
firstTimestamp: '1628877419958',
partitionLeaderEpoch: 0,
inTransaction: false,
isControlBatch: false,
lastOffsetDelta: 2,
producerId: '-1',
producerEpoch: 0,
firstSequence: 0,
maxTimestamp: '1628877419958',
timestampType: 0,
magicByte: 2
}
}
]
Received data.batch.highWatermark: 147
有没有关于如何在标签计算中使用 batch.highWatermark
的想法?