我对Kafka、序列化和JSON还不熟悉。
我的需求是:生产者通过Kafka发送JSON文件,消费者以原始文件形式消费并处理JSON文件。
我已经将JSON转换为字符串,并通过String Serializer发送它。消费者会解析该字符串并重新创建JSON对象,但我担心这并不高效或正确(可能会丢失JSON的字段类型)
因此,我尝试制作一个JSON序列化器,并在生产者的配置中设置该序列化器。
我使用了JsonEncoder: Kafka: writing custom serializer
但当我现在尝试运行我的生产者时,似乎编码器的toBytes函数中的try块永远没有像我期望的那样返回任何内容。
try {
bytes = objectMapper.writeValueAsString(object).getBytes();
} catch (JsonProcessingException e) {
logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
}
看起来 objectMapper.writeValueAsString(object).getBytes()
;将我的JSON对象({"name":"Kate","age":25}
)转换成了空值,
这是我的生产者运行函数。
List<KeyedMessage<String,JSONObject>> msgList=new ArrayList<KeyedMessage<String,JSONObject>>();
JSONObject record = new JSONObject();
record.put("name", "Kate");
record.put("age", 25);
msgList.add(new KeyedMessage<String, JSONObject>(topic, record));
producer.send(msgList);
我有一个问题,我原来的方法(将其转换为字符串并发送,然后重新构建JSON对象)可以吗?或者说这不是正确的方法?
谢谢!