如何在Golang中创建一个Kafka消费者组?

6
一个可用的库是sarama(或其扩展sarama-cluster),但没有提供消费者组示例,不在sarama中,也不在sarama-cluster中。我不理解API,请给我一个创建主题消费者组的示例?
2个回答

20

无需使用sarama-cluster库。它已经被弃用,不再适用于Apache Kafka集成。原始的Sarama库本身提供了一种使用消费者组连接到Kafka集群的方式。

我们需要创建客户端,然后初始化消费者组,在其中创建声明(claims)并等待消息通道接收消息。

初始化客户端:

kfversion, err := sarama.ParseKafkaVersion(kafkaVersion) // kafkaVersion is the version of kafka server like 0.11.0.2
if err != nil {
    log.Println(err)
}

config := sarama.NewConfig()
config.Version = kfversion
config.Consumer.Return.Errors = true

// Start with a client
client, err := sarama.NewClient([]string{brokerAddr}, config)
if err != nil {
    log.Println(err)
}
defer func() { _ = client.Close() }()

消费者组连接:

// Start a new consumer group
group, err := sarama.NewConsumerGroupFromClient(consumer_group, client)
if err != nil {
    log.Println(err)
}
defer func() { _ = group.Close() }()

开始从主题分区消费消息:

// Iterate over consumer sessions.
ctx := context.Background()
for {
    topics := []string{topicName}
    handler := &Message{}
    err := group.Consume(ctx, topics, handler)
    if err != nil {
        log.Println(err)
    }
}

最后一步是等待消息通道消费消息。我们需要实现所有函数(三个)来实现ConsumerGroupHandler接口。

func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error   { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
        sess.MarkMessage(msg, "")
    }
    return nil
}

查看有关使用Go语言的Kafka更多信息,请检查sarama库。


除非您卡在Kafka 0.10.1上,否则您仍然需要使用sarama-cluster。 - vinni_f
如果我在Kubernetes部署中的Pod上部署此程序,我能安全地进行扩展吗? - Li Ziyan
是的 @LiZiyan,您可以将生产者扩展到群集中的多个 Pod。 - Himanshu

2
消费者组由集群消费者“构造函数”的第二个参数指定。以下是一个非常基本的示例:
消费者组由集群消费者“构造函数”的第二个参数指定。以下是一个非常基本的示例:
import (
    "github.com/Shopify/sarama"
    "github.com/bsm/sarama-cluster"
)

conf := cluster.NewConfig()
// add config values

brokers := []string{"kafka-1:9092", "kafka-2:9092"}
group := "Your-Consumer-Group"
topics := []string{"topicName"}
consumer := cluster.NewConsumer(broker, group, topics, conf)

那么您将拥有属于指定用户组的消费者。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接