到目前为止,我还没有看到一个Python客户端明确实现创建主题(topic)的,而不使用配置选项自动创建主题。
到目前为止,我还没有看到一个Python客户端明确实现创建主题(topic)的,而不使用配置选项自动创建主题。
您可以使用 kafka-python
或 confluent_kafka
客户端来以编程方式创建主题,其中 librdkafka 是一个轻量级包装器。
使用 kafka-python
from kafka.admin import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:9092",
client_id='test'
)
topic_list = []
topic_list.append(NewTopic(name="example_topic", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
使用 confluent_kafka
。from confluent_kafka.admin import AdminClient, NewTopic
admin_client = AdminClient({
"bootstrap.servers": "localhost:9092"
})
topic_list = []
topic_list.append(NewTopic("example_topic", 1, 1))
admin_client.create_topics(topic_list)
confluent_kafka
(版本号不低于v0.11.6
),那么以下是如何创建kafka主题
、列出kafka主题
和删除kafka主题
的方法:>>> import confluent_kafka.admin, pprint
>>> conf = {'bootstrap.servers': 'broker01:9092'}
>>> kafka_admin = confluent_kafka.admin.AdminClient(conf)
>>> new_topic = confluent_kafka.admin.NewTopic('topic100', 1, 1)
# Number-of-partitions = 1
# Number-of-replicas = 1
>>> kafka_admin.create_topics([new_topic,]) # CREATE (a list(), so you can create multiple).
{'topic100': <Future at 0x7f524b0f1240 state=running>} # Stdout from above command.
>>> pprint.pprint(kafka_admin.list_topics().topics) # LIST
{'topic100' : TopicMetadata(topic100, 1 partitions),
'topic99' : TopicMetadata(topic99, 1 partitions),
'topic98' : TopicMetadata(topic98, 1 partitions)}
使用相同的kafka_admin
对象来删除Kafka主题
,可以这样做:
kafka_admin.delete_topics(['topic99', 'topic100',]) # DELETE
希望这有所帮助。\(◠﹏◠)/
confluent_kafka
库而不是kafka-python
库,因为前者是对librdkafka C/C++
库的“薄包装”(引用Confluent文献),因此性能更好。虽然,公平地说,kafka-python
更符合Python风格,两个库都很好用。 - NYCeyesprint('hello')
语句进行检查。 - NYCeyes看起来你可以使用以下方法来确保你的主题已经存在(我假设你正在使用以下的kafka python实现):
client = KafkaClient(...)
producer = KafkaProducer(...)
client.ensure_topic_exists('my_new_topic')
producer.send_messages('my_new_topic', ...)
ensure_topic_exists
只在启用了自动主题创建时才有效。https://github.com/mumrah/kafka-python/blob/cd81cf0ec8c1b7e7651374c5d1cbd105d003d352/kafka/client.py#L305-L306 - zackdeverfrom kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
topic = 'topic-name'
producer.send(topic, final_list[0]).get(timeout=10)
final_list
是什么? - Boris Tsema现在已经太晚了。我不知道有没有明确创建主题的命令,但以下代码可以创建并添加消息。
我创建了一个Python Kafka生产者:
prod = KafkaProducer(bootstrap_servers='localhost:9092')
for i in xrange(1000):
prod.send('xyz', str(i))
xyz
。当我使用上述方法时,Python-kafka 客户端创建了它并向其中添加了消息。bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test
AdminClient API 用于编程式主题创建和配置,该功能在 Kafka 0.11 中新增(最初为 Java)
预计非 Java 客户端库也将逐步添加此功能。请向您使用的 Kafka Python 客户端的作者(有多个)查询 KIP-4 管理协议支持何时会出现在 API 中
from confluent_kafka import Producer producer = Producer({ "bootstrap.servers": "localhost:9092" }) producer.produce("example_topic", key='key1', value='value1') producer.flush()
- user3900576