10得票1回答
kafka-python消费者自动从偏移量开始读取

我正在尝试使用kafka-python构建一个应用程序,其中消费者从一系列主题中读取数据。非常重要的是,消费者永远不会重复读取相同的消息,但也永远不会错过任何消息。 除了在关闭消费者(例如故障)并尝试从偏移量开始读取时出现问题之外,一切似乎都很正常。我只能读取主题中的所有消息 (这会导致重复...

10得票1回答
KafkaTimeoutError: 在60.0秒后更新元数据失败。

我有一个高吞吐量的Kafka生产者用例,想要每秒推送数千个JSON消息。 我有一个3节点的Kafka集群,并且正在使用最新的kafka-python库,以下是我用于生成消息的方法。 def publish_to_kafka(topic): data = get_data(topic...

9得票5回答
Kafka消费者:如何在Python中从最后一条消息开始消费

我正在使用Kafka 0.8.1和Kafka python-0.9.0。在我的设置中,我有两个Kafka代理。当我运行我的Kafka消费者时,我可以看到它从队列中检索消息并跟踪两个代理的偏移量。一切都很好! 我的问题是,当我重新启动消费者时,它会从头开始消费消息。我原先期望的是,在重新启动后...

9得票3回答
使用Kafka-Python的反序列化器无法消费来自Kafka的JSON消息

我正在尝试通过Kafka发送一个非常简单的JSON对象,并使用Python和kafka-python从另一侧读取它。然而,我不断看到以下错误: 2017-04-07 10:28:52,030.30.9998989105:kafka.future:8228:ERROR:10620:Error ...

8得票3回答
如何强制消费者在kafka中读取特定分区?

我有一个应用程序,可以从由1个Kafka生产者生成的URL流中下载特定的Web内容。我创建了一个具有5个分区的主题,并有5个Kafka消费者。但是网页下载的超时时间为60秒。当其中一个URL正在下载时,服务器会认为消息已丢失并将数据重新发送给不同的消费者。 我已经尝试了Kafka消费者配置/...

8得票1回答
在任何配置参数值下,Kafka是否保证单个分区内的消息顺序?

如果我在生产者设置Kafka配置参数为: 1. retries = 3 2. max.in.flight.requests.per.connection = 5 如果是这样,那么一个分区内的消息可能不会按照发送顺序排序。 Kafka 是否采取额外措施确保一个分区内的消息仅按照发送顺序...

8得票1回答
kafka-python中的多进程处理

我一直在使用python-kafka模块从kafka代理中消费数据。我希望可以并行地从具有“x”个分区的同一主题中进行消费。文档中提到: # Use multiple consumers in parallel w/ 0.9 kafka brokers # typically you wou...

8得票1回答
尝试连接Kafka时出现“无可用代理”错误。

我在尝试使用Python客户端连接本地的Kafka 0.10.0.0时遇到了一个非常奇怪的问题,我的操作系统是CentOS。 我的连接选项非常简单且默认: kafka_consumer = kafka.KafkaConsumer( bootstrap_servers=['l...

8得票3回答
Kafka的produce.send从未发送消息

我正在使用 Kafka 2.12 和 kafka-python 模块作为 Kafka 客户端。我尝试测试一个简单的生产者: class Producer(Process): daemon = True def run(self): producer = KafkaProducer(b...

8得票2回答
如何从Python客户端将JSON对象发送到Kafka

I have a simple JSON object like the following d = { 'tag ': 'blah', 'name' : 'sam', 'score': {'row1': 100, 'row2': 200 } } ...