我需要Kafka消费者日志来进行调试。我按照以下步骤进行:
chanLogs := make(chan confluentkafka.LogEvent)
go func() {
for {
logEv := <-chanLogs
logger.Debug("KAFKA: " + logEv.String())
}
}()
configMap["go.logs.channel.enable"] = true
configMap["go.logs.channel"] = chanLogs
consumer, err := confluentkafka.NewConsumer(&configMap)
err := consumer.SubscribeTopics(Topics, nil)
我从未得到单行。我尝试了使用kafka chan(
consumer.Logs()
)得到相同的结果。我做错了什么?
更新
在最初的帖子中,我错误地设置了参数名称。正确的名称是go.logs.channel.enable
。但有时这仍然无法起作用。
go.logs.channel.enable
等于true
添加到配置映射中。 - Matteo