使用Kafka-Python的反序列化器无法消费来自Kafka的JSON消息

9

我正在尝试通过Kafka发送一个非常简单的JSON对象,并使用Python和kafka-python从另一侧读取它。然而,我不断看到以下错误:

2017-04-07 10:28:52,030.30.9998989105:kafka.future:8228:ERROR:10620:Error processing callback
Traceback (most recent call last):
  File "C:\Anaconda2\lib\site-packages\kafka\future.py", line 79, in _call_backs
    f(value)
  File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 760, in _handle_fetch_response
    unpacked = list(self._unpack_message_set(tp, messages))
  File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 539, in _unpack_message_set
    tp.topic, msg.value)
  File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 570, in _deserialize
    return f(bytes_)
  File "C:\Users\myUser\workspace\PythonKafkaTest\src\example.py", line 55, in <lambda>
    value_deserializer=lambda m: json.loads(m).decode('utf-8'))
  File "C:\Anaconda2\lib\json\__init__.py", line 339, in loads
    return _default_decoder.decode(s)
  File "C:\Anaconda2\lib\json\decoder.py", line 364, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "C:\Anaconda2\lib\json\decoder.py", line 382, in raw_decode
    raise ValueError("No JSON object could be decoded")
ValueError: No JSON object could be decoded

我已经做了一些研究,发现导致这个错误的最常见原因是JSON格式有误。我尝试通过在我的代码中添加以下内容来打印发送前的JSON,JSON会无误地打印出来。

  while True:
        json_obj1 = json.dumps({"dataObjectID": "test1"})
        print json_obj1
        producer.send('my-topic', {"dataObjectID": "test1"})
        producer.send('my-topic', {"dataObjectID": "test2"})
        time.sleep(1)

这让我怀疑我能够生成JSON,但无法消费它。
以下是我的代码:
import threading
import logging
import time
import json

from kafka import KafkaConsumer, KafkaProducer


class Producer(threading.Thread):
    daemon = True

    def run(self):
        producer = KafkaProducer(bootstrap_servers='localhost:9092',
                                 value_serializer=lambda v: json.dumps(v).encode('utf-8'))

        while True:
            producer.send('my-topic', {"dataObjectID": "test1"})
            producer.send('my-topic', {"dataObjectID": "test2"})
            time.sleep(1)


class Consumer(threading.Thread):
    daemon = True

    def run(self):
        consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest',
                                 value_deserializer=lambda m: json.loads(m).decode('utf-8'))
        consumer.subscribe(['my-topic'])

        for message in consumer:
            print (message)


def main():
    threads = [
        Producer(),
        Consumer()
    ]

    for t in threads:
        t.start()

    time.sleep(10)

if __name__ == "__main__":
    logging.basicConfig(
        format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:' +
               '%(levelname)s:%(process)d:%(message)s',
        level=logging.INFO
    )
    main()

如果我去掉value_serializer和value_deserializer,我就可以成功地发送和接收字符串。当我运行这段代码时,我可以看到我发送的JSON。以下是一个简短的片段:

ConsumerRecord(topic=u'my-topic', partition=0, offset=5742, timestamp=None, timestamp_type=None, key=None, value='{"dataObjectID": "test1"}', checksum=-1301891455, serialized_key_size=-1, serialized_value_size=25)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5743, timestamp=None, timestamp_type=None, key=None, value='{"dataObjectID": "test2"}', checksum=-1340077864, serialized_key_size=-1, serialized_value_size=25)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5744, timestamp=None, timestamp_type=None, key=None, value='test', checksum=1495943047, serialized_key_size=-1, serialized_value_size=4)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5745, timestamp=None, timestamp_type=None, key=None, value='\xc2Hello, stranger!', checksum=-1090450220, serialized_key_size=-1, serialized_value_size=17)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5746, timestamp=None, timestamp_type=None, key=None, value='test', checksum=1495943047, serialized_key_size=-1, serialized_value_size=4)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5747, timestamp=None, timestamp_type=None, key=None, value='\xc2Hello, stranger!', checksum=-1090450220, serialized_key_size=-1, serialized_value_size=17)

因此,我尝试从消费者中删除value_deserializer,代码可以执行,但是没有反序列化器,消息以String形式输出,这不是我需要的。所以,为什么value_deserializer不起作用?是否有其他方法可以获取Kafka消息中的JSON?


我更喜欢一个更简单的 value_deserializer=lambda x: x.decode('utf-8'),然后稍后再执行 message_dict = json.loads(message.value),在那里您可以根据条件进行处理或包装它以进行异常处理。 - MarkHu
3个回答

9

通过将消息先解码为UTF-8,然后使用json.load/dump它,我的问题得到了解决:

value_deserializer=lambda m: json.loads(m.decode('utf-8'))

替换为:

value_deserializer=lambda m: json.loads(m).decode('utf-8')

希望这也适用于生产者方面。

3
原来问题出在value_deserializer=lambda m: json.loads(m).decode('utf-8')的解码部分。当我将其更改为value_deserializer=lambda m: json.loads(m)时,我发现从Kafka读取的对象类型现在是字典。根据Python JSON文档中以下信息,这是正确的:
|---------------------|------------------|
|       JSON          |     Python       |
|---------------------|------------------|
|      object         |      dict        |
|---------------------|------------------|
|      array          |      list        |
|---------------------|------------------|
|      string         |      unicode     |
|---------------------|------------------|
|      number (int)   |      int, long   |
|---------------------|------------------|
|      number (real)  |      float       |
|---------------------|------------------|
|      true           |      True        |
|---------------------|------------------|
|      false          |      False       |
|---------------------|------------------|
|      null           |      None        |
|---------------------|------------------|

那么,你能告诉我你的问题是如何解决的吗?我遇到了和你一样的错误信息。 - T Anna

2
您不需要使用lambda,可以改为使用最初的回答。
value_deserializer=lambda m: json.loads(m)

you should use

value_deserializer=json.load

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接