Kafkajs - 获取统计信息(滞后)

3
在我们的 nest.js 应用程序中,我们使用 kafkajs 客户端处理 kafka 消息。我们需要监控统计信息,其中一个指标是 lag
尝试找出 kafkajs 是否提供任何有趣的内容,但没有发现什么特别有趣的信息(payload 中最有趣的信息是:timestampoffsetbatchContext.firstOffsetbatchContext.firstTimestampbatchContext.maxTimestamp)。
问题:
有什么想法可以记录由 kafkajs 提供的 lag 值和其他统计信息吗?
我是否应该考虑实现自己的统计监视器来收集 node 应用程序中使用 kafka.js 客户端所需的信息?
新细节 1:

根据文档,我可以获取batch.highWatermark,其中

batch.highWatermark是主题分区中最后一个已提交的偏移量。它对于计算滞后可能有用。

尝试中...

  await consumer.run({
    eachBatchAutoResolve: true,
    eachBatch: async (data) => {
      console.log('Received data.batch.messages: ', data.batch.messages)
      console.log('Received data.batch.highWatermark: ', data.batch.highWatermark)
    },
  })

我可以像下一个一样获取信息:
Received data.batch.messages:  [
  {
    magicByte: 2,
    attributes: 0,
    timestamp: '1628877419958',
    offset: '144',
    key: null,
    value: <Buffer 68 65 6c 6c 6f 21>,
    headers: {},
    isControlRecord: false,
    batchContext: {
      firstOffset: '144',
      firstTimestamp: '1628877419958',
      partitionLeaderEpoch: 0,
      inTransaction: false,
      isControlBatch: false,
      lastOffsetDelta: 2,
      producerId: '-1',
      producerEpoch: 0,
      firstSequence: 0,
      maxTimestamp: '1628877419958',
      timestampType: 0,
      magicByte: 2
    }
  },
  {
    magicByte: 2,
    attributes: 0,
    timestamp: '1628877419958',
    offset: '145',
    key: null,
    value: <Buffer 6f 74 68 65 72 20 6d 65 73 73 61 67 65>,
    headers: {},
    isControlRecord: false,
    batchContext: {
      firstOffset: '144',
      firstTimestamp: '1628877419958',
      partitionLeaderEpoch: 0,
      inTransaction: false,
      isControlBatch: false,
      lastOffsetDelta: 2,
      producerId: '-1',
      producerEpoch: 0,
      firstSequence: 0,
      maxTimestamp: '1628877419958',
      timestampType: 0,
      magicByte: 2
    }
  },
  {
    magicByte: 2,
    attributes: 0,
    timestamp: '1628877419958',
    offset: '146',
    key: null,
    value: <Buffer 6d 6f 72 65 20 6d 65 73 73 61 67 65 73>,
    headers: {},
    isControlRecord: false,
    batchContext: {
      firstOffset: '144',
      firstTimestamp: '1628877419958',
      partitionLeaderEpoch: 0,
      inTransaction: false,
      isControlBatch: false,
      lastOffsetDelta: 2,
      producerId: '-1',
      producerEpoch: 0,
      firstSequence: 0,
      maxTimestamp: '1628877419958',
      timestampType: 0,
      magicByte: 2
    }
  }
]
Received data.batch.highWatermark:  147

有没有关于如何在标签计算中使用 batch.highWatermark 的想法?


batch.highwatermark是当前分区上最后提交的消息的偏移量(即您已经确认读取的内容)。通过将其从message.offset中减去,您可以将其与批处理大小进行比较。差异越大,则滞后越大。您可能需要为每个分区运行一个清单。https://kafka.js.org/docs/consuming - aggaton
2个回答

1

看起来获取偏移量延迟指标的唯一方法是使用仪器事件

consumer.on(consumer.events.END_BATCH_PROCESS, (payload) =>
  console.log(payload.offsetLagLow),
);

offsetLagLow用于衡量批处理中第一条消息与分区的最后偏移量(highWatermark)之间的偏移差。您也可以使用offsetLag,但它基于批处理的最后偏移量。

正如@Sergii提到的,当您使用eachBatch时,会直接提供一些可用的props(这里batch prop的所有可用方法)。但如果您使用eachMessage,则无法获取该props。因此,仪表事件是最通用的方法。


0

这是我们用来计算每个客户端/组/主题/分区滞后的代码。

import { Kafka } from 'kafkajs'
async function lag (clientConfig) { 
    let status = []
    const kafkaClient = new Kafka(clientConfig)
    const admin = kafkaClient.admin()
    await admin.connect()
    const groups = await admin.listGroups()
    const groupsNames = await groups.groups.map(x => x.groupId)
    const gd = await admin.describeGroups(groupsNames)
    let currentMapOfTopicOffsetByGroupId = {}
    for (const g of gd.groups) {
        const topicOffset = await admin.fetchOffsets({ groupId: g.groupId })
        if (currentMapOfTopicOffsetByGroupId[g.groupId] == undefined) {
            currentMapOfTopicOffsetByGroupId[g.groupId] = {}    
        }
        topicOffset.forEach(to => {
            to.partitions.forEach(p => {
                if (currentMapOfTopicOffsetByGroupId[g.groupId][to.topic] == undefined) {
                    currentMapOfTopicOffsetByGroupId[g.groupId][to.topic] = {}
                }
                currentMapOfTopicOffsetByGroupId[g.groupId][to.topic][parseInt(p.partition)] = p.offset     
            })
        })
        
        for (const m of g.members) {    
            const memberMetadata = AssignerProtocol.MemberMetadata.decode(m.memberMetadata)
            const memberAssignment = AssignerProtocol.MemberAssignment.decode(m.memberAssignment)
            for (const t of memberMetadata.topics) {

                const res = await admin.fetchTopicOffsets(t)    

                res.forEach(r => {
                    const lag = parseInt(r.high) - parseInt(currentMapOfTopicOffsetByGroupId[g.groupId][t][parseInt(r.partition)])
                    if (currentMapOfTopicOffsetByGroupId[g.groupId][t] !== undefined) {
                        status.push({
                            HOST: m.clientHost,
                            STATE: g.state,
                            MEMBER_ID: m.memberId,
                            GROUP_ID: g.groupId,
                            TOPIC: t,
                            PARTITION: r.partition,
                            OFFSET: r.offset,
                            C_OFFSET: parseInt(currentMapOfTopicOffsetByGroupId[g.groupId][t][parseInt(r.partition)]),
                            LAG: lag
                        })
                    }
                }) 
            }
        }
    }
    return status
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接