如何使用Python编写程序来在Apache Kafka中创建主题

55

到目前为止,我还没有看到一个Python客户端明确实现创建主题(topic)的,而不使用配置选项自动创建主题。

7个回答

80

您可以使用 kafka-pythonconfluent_kafka 客户端来以编程方式创建主题,其中 librdkafka 是一个轻量级包装器。


使用 kafka-python

from kafka.admin import KafkaAdminClient, NewTopic


admin_client = KafkaAdminClient(
    bootstrap_servers="localhost:9092", 
    client_id='test'
)

topic_list = []
topic_list.append(NewTopic(name="example_topic", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
使用 confluent_kafka
from confluent_kafka.admin import AdminClient, NewTopic


admin_client = AdminClient({
    "bootstrap.servers": "localhost:9092"
})

topic_list = []
topic_list.append(NewTopic("example_topic", 1, 1))
admin_client.create_topics(topic_list)

请问您能否添加主题配置,例如max.message.bytes=1000000的confluent_kafka示例。 - Rubber Duck
1
使用上述技术,我能够创建主题,我得到的响应是 {'jjd_topic1': <Future at 0x7f7914970940 state=running>},但是当我列出主题时,新主题不在列表中...就好像它没有持久化一样。这里可能发生了什么? - JamesD
1
我也没有看到使用confluent_kafka答案在这里提到kafka的新话题。 - Mayak
1
我也遇到了类似的问题,但一旦你在那个主题上生成一条消息,它就会开始显示出来。from confluent_kafka import Producer producer = Producer({ "bootstrap.servers": "localhost:9092" }) producer.produce("example_topic", key='key1', value='value1') producer.flush() - user3900576
有没有办法使用Kafka默认属性创建主题?我的意思是不需要明确指定分区和副本因子! - sandeep P
@JamesD 我认为你需要等待操作完成。请参考基本的AdminClient示例 https://github.com/confluentinc/confluent-kafka-python#basic-adminclient-example - Winand

11
如果您可以运行Python的confluent_kafka(版本号不低于v0.11.6),那么以下是如何创建kafka主题列出kafka主题删除kafka主题的方法:
>>> import confluent_kafka.admin, pprint

>>> conf        = {'bootstrap.servers': 'broker01:9092'}
>>> kafka_admin = confluent_kafka.admin.AdminClient(conf)

>>> new_topic   = confluent_kafka.admin.NewTopic('topic100', 1, 1)
                  # Number-of-partitions  = 1
                  # Number-of-replicas    = 1

>>> kafka_admin.create_topics([new_topic,]) # CREATE (a list(), so you can create multiple).
    {'topic100': <Future at 0x7f524b0f1240 state=running>} # Stdout from above command.

>>> pprint.pprint(kafka_admin.list_topics().topics) # LIST
    {'topic100' : TopicMetadata(topic100, 1 partitions),
     'topic99'  : TopicMetadata(topic99,  1 partitions),
     'topic98'  : TopicMetadata(topic98,  1 partitions)}

使用相同的kafka_admin对象来删除Kafka主题,可以这样做:

kafka_admin.delete_topics(['topic99', 'topic100',]) # DELETE

希望这有所帮助。\(◠﹏◠)/


请注意,我更倾向于使用confluent_kafka库而不是kafka-python库,因为前者是对librdkafka C/C++库的“薄包装”(引用Confluent文献),因此性能更好。虽然,公平地说,kafka-python更符合Python风格,两个库都很好用。 - NYCeyes
2
当我打开Python shell并逐行输入您的代码时,它可以100%正常工作 - 我创建主题并列出所有主题时,它都在那里。但是,当我将其放入.py文件中并运行该文件时,由于某种原因它不会被创建,尽管它没有给出错误。如果我将文件中的每一行复制并粘贴到Python shell中,则它会再次创建。完全相同的代码在shell中可以工作,但在文件中却不能... - Alfa Bravo
@AlfaBravo 嗯...你确定程序在运行吗?尝试在代码周围插入 print('hello') 语句进行检查。 - NYCeyes
2
是的,我亲身经历过这种情况 - 程序运行,并且客户端返回有关创建的主题的信息 {'jjd_topic1': <Future at 0x7f7914970940 state=running>},但是当我运行列表时,它们并不存在。就好像它们从未被提交一样。为什么会这样呢? - JamesD
1
非常奇怪--@AlfaBravo,我也遇到了同样的问题。如果我将其作为.py文件运行,它不会创建主题,但是如果我进入Python shell并执行它,则会创建主题... - Nicole White

2

看起来你可以使用以下方法来确保你的主题已经存在(我假设你正在使用以下的kafka python实现):

client = KafkaClient(...)
producer = KafkaProducer(...)
client.ensure_topic_exists('my_new_topic')
producer.send_messages('my_new_topic', ...)

4
不行。ensure_topic_exists 只在启用了自动主题创建时才有效。https://github.com/mumrah/kafka-python/blob/cd81cf0ec8c1b7e7651374c5d1cbd105d003d352/kafka/client.py#L305-L306 - zackdever

1
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
topic = 'topic-name'

producer.send(topic, final_list[0]).get(timeout=10)

final_list是什么? - Boris Tsema
@BorisTsema请参考:https://github.com/dpkp/kafka-python/blob/master/kafka/producer/kafka.py#L516这是一个主题值,如果我没记错的话。 这是您想要发送给消费者的数据。 - Mohsin Aljiwala

1

现在已经太晚了。我不知道有没有明确创建主题的命令,但以下代码可以创建并添加消息。

我创建了一个Python Kafka生产者:

prod = KafkaProducer(bootstrap_servers='localhost:9092')
for i in xrange(1000):
    prod.send('xyz', str(i))

在 Kafka 主题列表中之前没有 xyz。当我使用上述方法时,Python-kafka 客户端创建了它并向其中添加了消息。

3
实际上,经纪人创建了该主题,仅因为auto.topic.create.enable设置为“true”。以这种方式创建的所有主题都将具有默认配置,这些配置可能或可能不适合您的使用情况。 - Hans Jespersen

1
似乎没有kafka服务器API可以创建主题,因此您必须使用主题自动创建或命令行工具:
bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test

不,你需要使用AdminClient,根据NYCeyes的评论。 - HenryM

0

AdminClient API 用于编程式主题创建和配置,该功能在 Kafka 0.11 中新增(最初为 Java)

参见https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations

预计非 Java 客户端库也将逐步添加此功能。请向您使用的 Kafka Python 客户端的作者(有多个)查询 KIP-4 管理协议支持何时会出现在 API 中

参见https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接