Kafka-python如何消费JSON消息

6

我在Python中是新手,刚开始接触Kafka。我有一个需要发送和消费json消息的需求。为此,我使用kafka-python与Kafka通信。

#Producer.py
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('offering_new', {"dataObjectID": "test1"})

#Consumer.py
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',auto_offset_reset='earliest', value_deserializer=lambda m: json.loads(m.decode('utf-8')))
consumer.subscribe(['offering_new'])
for message in consumer :
    print(message)

然而,我在消费者端遇到了以下异常:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 1111, in __next__
    return next(self._iterator)
  File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 1082, in _message_generator
    for msg in self._fetcher:
  File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 482, in __next__
    return next(self._iterator)
  File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 388, in _message_generator
    self._next_partition_records = self._parse_fetched_data(completion)
  File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 799, in _parse_fetched_data
    unpacked = list(self._unpack_message_set(tp, records))
  File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 458, in _unpack_message_set
    tp.topic, record.value)
  File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 492, in _deserialize
    return f(bytes_)
  File "<stdin>", line 1, in <lambda>
  File "/usr/lib/python3.6/json/__init__.py", line 354, in loads
    return _default_decoder.decode(s)
  File "/usr/lib/python3.6/json/decoder.py", line 339, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/lib/python3.6/json/decoder.py", line 357, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

我在Python Shell中运行上述代码。有人能告诉我哪里出错了吗?

你先从控制台检查一下写入的消息是否是有效的 JSON 了吗? - Nishu Tayal
2个回答

9
使用auto_offset_reset='earliest'配置您的消费者以读取主题中的所有消息。JSON解码错误表明先前生成到主题的某些消息实际上不是JSON格式。
一些解决方案:
(1) 改为从主题尾部开始消费:auto_offset_reset='latest' (2) 开始一个新主题:consumer.subscribe(['offering_new_too']) (3) 使用更全面的反序列化程序:
def forgiving_json_deserializer(v):
    return if v is None
    try:
        return json.loads(v.decode('utf-8'))
    except json.decoder.JSONDecodeError:
        log.exception('Unable to decode: %s', v)
        return None

KafkaConsumer(value_deserializer=forgiving_json_deserializer, ...)

希望这可以帮到您!

我遇到了相同的错误,将auto_offset_reset='latest'更改后问题得到解决。非常感谢。 - Conquistador
如果 forgiving_json_deserializer 返回 None,消费者是否会继续处理其他消息?有相关的参考资料吗? - y_159

1

Make sure you have follow below process

pip install kafka-python

在Python脚本中导入以下库

from kafka import KafkaConsumer
from json import loads

创建 Kafka 消费者对象的代码如下所示:

consumer = KafkaConsumer(
    'spring_test',
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: loads(x.decode('utf-8')));

打印所有消息
for message in consumer:
    message = message.value;
    print('{}'.format(message))

希望这也能有所帮助。

1
你需要在一个 while True 循环中持续处理消息。同时,如何设置才能从上次读取的最后一条消息继续,而不是重新遍历所有消息呢? - Alfa Bravo
您没有处理消息无法解码为JSON的情况。 - y_159

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接