发送消息到kafka主题时序列化错误

21
我需要测试一个包含头部信息的消息,所以我需要使用MessageBuilder,但是我无法序列化它。
我尝试在生产者属性中添加序列化设置,但没有起作用。
有人能帮助我吗?
错误信息如下:
org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.springframework.messaging.support.GenericMessage to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

我的测试类:

public class TransactionMastercardAdapterTest extends AbstractTest{

@Autowired
private KafkaTemplate<String, Message<String>> template;

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1);

@BeforeClass
public static void setUp() {
    System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
    System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
}

@Test
public void sendTransactionCommandTest(){

    String payload = "{\"o2oTransactionId\" : \"" + UUID.randomUUID().toString().toUpperCase() + "\","
            + "\"cardId\" : \"11\","
            + "\"transactionId\" : \"20110405123456\","
            + "\"amount\" : 200.59,"
            + "\"partnerId\" : \"11\"}";

    Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, Message<String>> producer = new KafkaProducer<>(props);
    producer.send(new ProducerRecord<String, Message<String>> ("notification_topic", MessageBuilder.withPayload(payload)
            .setHeader("status", "RECEIVED")
            .setHeader("service", "MASTERCARD")
            .build()));

    Map<String, Object> configs = KafkaTestUtils.consumerProps("test1", "false", embeddedKafka);

    configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    ConsumerFactory<byte[], byte[]> cf = new DefaultKafkaConsumerFactory<>(configs);

    Consumer<byte[], byte[]> consumer = cf.createConsumer();
    consumer.subscribe(Collections.singleton("transaction_topic"));
    ConsumerRecords<byte[], byte[]> records = consumer.poll(10_000);
    consumer.commitSync();

    assertThat(records.count()).isEqualTo(1);
}

}

6个回答

5
我认为错误很明显:
Can't convert value of class org.springframework.messaging.support.GenericMessage to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

当你的值为GenericMessage时,但StringSerializer仅适用于字符串。

你需要的是称为JavaSerializer的东西,虽然它不存在,但编写起来并不难:

public class JavaSerializer implements Serializer<Object> {

    @Override
    public byte[] serialize(String topic, Object data) {
        try {
            ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
            ObjectOutputStream objectStream = new ObjectOutputStream(byteStream);
            objectStream.writeObject(data);
            objectStream.flush();
            objectStream.close();
            return byteStream.toByteArray();
        }
        catch (IOException e) {
            throw new IllegalStateException("Can't serialize object: " + data, e);
        }
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public void close() {

    }

}

并为value.serializer属性进行配置。


谢谢回答,但是不起作用。2017-04-25 11:11:12,133 错误 -kafka-listener-1 o.s.c.s.b.k.KafkaMessageChannelBinder:287 - 无法转换消息:ACED000573720 java.lang.StringIndexOutOfBoundsException: 字符串索引超出范围:-19 - Tiago Costa
org.springframework.messaging.converter.MessageConversionException: 无法读取JSON:意外字符('¬'(代码172)):期望有效值(数字,字符串,数组,对象,“true”,“false”或“null”) 在[来源:[B@4e076253;行:1,列:2];嵌套异常是com.fasterxml.jackson.core.JsonParseException:意外字符('¬'(代码172)):期望有效值(数字,字符串,数组,对象,“true”,“false”或“null”) 在[来源:[B@4e076253;行:1,列:2] - Tiago Costa
我认为问题在于GenericMessage类的两个属性,它创建了两个JSON,headers = {id = 067-879b0, service = MASTER, status = RECEIVED, timestamp = 1493130032409}和payload = "{..}"。也许这不是使用headers发送消息的正确方式。 - Tiago Costa
1
那完全是另一回事了。你说的是发送消息,但现在是关于读取消息。从SO的角度来看,你应该得到“-1”的评价。如果你想发送JSON,你应该考虑提前手动将消息写成JSON格式,然后使用producer.send()发送。 - Artem Bilan
错误发生在序列化过程中,在serialize方法的返回值中。但是感谢您的帮助。 - Tiago Costa
它运行得很完美,Artem Bilan。这是我的错误,在这里感谢你的帮助。 - Tiago Costa

4
private void configureProducer() {
Properties props = new Properties();
props.put("key.serializer",
        "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
        "org.apache.kafka.common.serialization.ByteArraySerializer");

producer = new KafkaProducer<String, String>(props);

这将完成工作。


4
ByteArraySerializer 对我无效,但 JsonSerializer 有效。 - jumping_monkey

2

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer


将以下内容添加到您的application.properties文件中。 - CollinsKe

1
这是我用过的方法,对我很有效。
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.springframework.kafka.support.serializer.JsonSerializer.class);

JsonSerializer文档中可以看到:

这是一个通用的序列化器,用于将Java对象以JSON格式发送到Kafka。


0
在我的情况下,我正在使用Spring Cloud,并且没有在属性文件中添加以下属性。
spring.cloud.stream.kafka.binder.configuration.value.serializer=org.apache.kafka.common.serialization.StringSerializer

-9

使用@XmlRootElement为JSON类添加注释


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接