Kafka序列化器JSON

9

我对Kafka、序列化和JSON还不熟悉。

我的需求是:生产者通过Kafka发送JSON文件,消费者以原始文件形式消费并处理JSON文件。

我已经将JSON转换为字符串,并通过String Serializer发送它。消费者会解析该字符串并重新创建JSON对象,但我担心这并不高效或正确(可能会丢失JSON的字段类型)

因此,我尝试制作一个JSON序列化器,并在生产者的配置中设置该序列化器。

我使用了JsonEncoder: Kafka: writing custom serializer

但当我现在尝试运行我的生产者时,似乎编码器的toBytes函数中的try块永远没有像我期望的那样返回任何内容。

try {
            bytes = objectMapper.writeValueAsString(object).getBytes();

        } catch (JsonProcessingException e) {
            logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
        }

看起来 objectMapper.writeValueAsString(object).getBytes();将我的JSON对象({"name":"Kate","age":25})转换成了空值,

这是我的生产者运行函数。

List<KeyedMessage<String,JSONObject>> msgList=new ArrayList<KeyedMessage<String,JSONObject>>();   

    JSONObject record = new JSONObject();

    record.put("name", "Kate");
    record.put("age", 25);

    msgList.add(new KeyedMessage<String, JSONObject>(topic, record));

    producer.send(msgList);

我有一个问题,我原来的方法(将其转换为字符串并发送,然后重新构建JSON对象)可以吗?或者说这不是正确的方法?
谢谢!

你解决了这个问题吗? - Bector2
在您删除的答案中,您提到了“意识到我有不兼容的Jackson Jars”,所以问题已经解决了吗? - bummi
2个回答

6

你为什么担心序列化/反序列化会导致数据丢失呢?

你可以选择使用包含在Confluent Schema Registry中的Kafka JSON序列化程序,这是免费开源软件(声明:我在Confluent工作)。它的测试套件提供了一些示例以帮助你入门,并且更多细节在序列化程序和格式化程序中有描述。这个JSON序列化程序和模式注册表本身的好处是,它们为Kafka的生产者和消费者客户端提供透明的集成。除了JSON之外,如果需要,还支持Apache Avro。

在我看来,这个设置是与Kafka以JSON交互时开发人员方便和易用性最好的选项之一 - 当然,你的情况可能不同!


1
我建议将您的事件字符串转换为字节数组,例如:

byte[] eventBody = event.getBody();

这样会提高性能,并且Kafka消费者还提供了JSON解析器,可以帮助您获取JSON数据。如果需要进一步的信息,请让我知道。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接