使用自定义对象数据类型进行Kafka Stream聚合

7

我是一名有用的助手,可以为您翻译文本。

我有一个处理器,从主题中获取类型为GenericRecord的json字符串。现在我将流分成两个分支。我取第一个分支,并将(键、值)映射到包含json特定字段和该字段值的2个字符串中,并按键分组。到目前为止一切正常。现在我必须使用新的用户定义类型对流进行聚合,但我收到了一个异常。

这里是代码:

新类型:

private class Tuple {

    public int occ;
    public int sum;


    public Tuple (int occ, int sum) {
        this.occ = occ;
        this.sum = sum;
    }

    public void sum (int toAdd) {
        this.sum += toAdd;
        this.occ ++;
    }

    public int getAverage () {
        return this.sum / this.occ;
    }

    public String toString() {
        return occ + "-> " + sum + ": " + getAverage();
    }

良好的流程:

  StreamsBuilder builder = new StreamsBuilder();
    KStream<GenericRecord, GenericRecord> source =
          builder.stream(topic);

    KStream<GenericRecord, GenericRecord>[] branches = source.branch(
            (key,value) -> partition(value.toString()),
            (key, value) -> true
    );

    KGroupedStream <String, String> groupedStream = branches[0]
            .mapValues(value -> createJson(value.toString()))
            .map((key, value) -> KeyValue.pair(new String("T_DUR_CICLO"), value.getNumberValue("payload", "T_DUR_CICLO")))
            .peek((key, value) -> System.out.println("key=" + key + ", value=" + value))
            .groupByKey();

问题:

   KTable<String, Tuple> aggregatedStream = groupedStream.aggregate(
            () -> new Tuple (0,0), // initializer 
            (aggKey, newValue, aggValue) -> new Tuple (aggValue.occ + 1, aggValue.sum + Integer.parseInt(newValue)));



    KafkaStreams streams = new KafkaStreams(builder.build(), props);
    streams.start();

这是异常情况:

   Exception in thread "streamtest-6173d6a2-4a3a-4d76-b793-774719f8b1f5-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_0, processor=KSTREAM-SOURCE-0000000011, topic=streamtest-KSTREAM-AGGREGATE-STATE-STORE-0000000007-repartition, partition=0, offset=0
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:318)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:964)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (value: io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer) is not compatible to the actual value type (value type: com.mycompany.maventest.Streamer$Tuple). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:195)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:66)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:57)
    at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
    at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:94)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302)
    ... 6 more
Caused by: java.lang.ClassCastException: com.mycompany.maventest.Streamer$Tuple cannot be cast to org.apache.avro.generic.GenericRecord
    at io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer.serialize(GenericAvroSerializer.java:39)
    at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:191)
    ... 19 more

我该如何解决这个问题?

----- 更新 ------

生产者使用 Avro 进行生产,所以我有以下配置属性:

 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);

如果我指定自定义serde,结果如下所示:
 KTable<String, Tuple> aggregatedStream = groupedStream.aggregate(
            () -> new Tuple(0, 0), // initializer 
            (aggKey, newValue, aggValue) ->  new Tuple (aggValue.occ + 1, aggValue.sum + Integer.parseInt(newValue)),
            Materialized.with(Serdes.String(), new MySerde()));

异常:

   Exception in thread "streamtest-17deb5c8-ed07-4fcf-bd59-37b75e44b83f-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:97)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:677)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:943)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:831)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

---已解决---- 我还添加了groupBy中类型更改的新serde

 KGroupedStream <String, String> groupedStream = branches[0]
            .mapValues(value -> createJson(value.toString()))
            .map((key, value) -> KeyValue.pair(new String("T_DUR_CICLO"), value.getNumberValue("payload", "T_DUR_CICLO")))
            .peek((key, value) -> System.out.println("key=" + key + ", value=" + value))
            .groupByKey( Serialized.with(
                    Serdes.String(), /* key (note: type was modified) */
                    Serdes.String()));  /* value */
1个回答

6

Kafka Streams将使用默认的Serde,除非在操作中明确指定。

在aggregate()方法中,您将valueType定义为,而默认的serde适用于,因此会引发异常。您需要像下面这样指定serde:

 KTable<String, Tuple> aggregatedStream = groupedStream.aggregate(
            () -> new Tuple (0,0), // initializer 
            (aggKey, newValue, aggValue) -> 
                 new Tuple (aggValue.occ + 1, aggValue.sum + Integer.parseInt(newValue))
                ,Materialized.with(keySerde, tupleSerde));

它将使用tupleSerde进行此操作。 您可以在此处找到示例: https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#aggregating

这个tupleSerde是什么,你有它的实现例子吗? - Sam Shiles
TupleSerde是根据OP的要求而来的。您将需要编写一个实现,就像JsonSerde或任何其他serde一样。 - Nishu Tayal
如果我需要使用Materialized.as()方法而不是Materialized.with()来指定存储参数,该怎么办?我该如何提供value serde的值?在这种情况下,withValueSerde()方法期望newValue类型的Serde,似乎是这样。 - Sergey Shcherbakov
@SergeyShcherbakov Materialized支持流畅的接口。您可以同时指定:Materialized.as(“name-for-state-store”)。withKeySerde(keySerde)。withValueSerde(valueSerde) - Benissimo

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接