Flink Kafka EXACTLY_ONCE导致KafkaException:ByteArraySerializer不是Serializer的实例

10
所以,我试图在我的Flink Kafka流处理作业中启用EXACTLY_ONCE语义,同时进行检查点。
然而,我无法让它工作,所以我尝试从Github下载测试示例代码: https://github.com/apache/flink/blob/c025407e8a11dff344b587324ed73bdba2024dff/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java 这样运行是没问题的。但是,当我启用检查点时,我会收到错误消息。 如果我将EXACTLY_ONCE更改为AT_LEAST_ONCE语义并启用检查点,则可以正常工作。但是当我将其更改为EXACTLY_ONCE时,我又会收到此错误信息。
我得到的异常:
org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
    at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
    at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:650)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1099)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1036)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:430)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
    at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:76)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
    at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:360)
    ... 12 more

我已经对代码进行了轻微的更改,以适应我的环境。我正在Docker内部的Flink操作游乐场中运行它。(此链接为最新版本1.10,其中提供的Kafka版本是2.2.1)。

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(1_000);

        String inputTopic = "my-input";
        String outputTopic = "my-output";
        String kafkaHost = "kafka:9092";

        Properties kafkaProps = new Properties();
        kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
        kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");


        DataStream<KafkaEvent> input = env
                .addSource(new FlinkKafkaConsumer<>(inputTopic, new KafkaEventSchema(), kafkaProps)
                        .assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
                .keyBy("word")
                .map(new RollingAdditionMapper());

        input.addSink(
                new FlinkKafkaProducer<>(
                        outputTopic,
                        new KafkaEventSerializationSchema(outputTopic),
                        kafkaProps,
                        FlinkKafkaProducer.Semantic.EXACTLY_ONCE));

        env.execute("Modern Kafka Example");
    }

可以在示例中找到其他类: https://github.com/apache/flink/tree/c025407e8a11dff344b587324ed73bdba2024dff/flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base

我尝试将序列化更改为使用KafkaSerializationSchema而不是使用SerializationSchema的示例代码。 然而下面这段代码也没有帮助。出现了相同的错误。

import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaEventSerializationSchema implements KafkaSerializationSchema<KafkaEvent> {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    private String topic;

    public KafkaEventSerializationSchema(String topic) {
        this.topic = topic;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(KafkaEvent element, Long timestamp) {
        return new ProducerRecord<>(topic, element.toString().getBytes());
    }


}

非常感谢您的帮助。我尚未能够找到关于Flink和Kafka之间EXACTLY_ONCE保证的在线工作代码。只有大量的文章谈论它,但没有实际的实质性工作代码。这就是我在这里尝试实现的全部内容。


在这里(https://dev59.com/trfna4cB1Zd3GeqPozqP#58644689),他们在类“ObjSerializationSchema”上实现了自己的“KafkaSerializationSchema”。我猜它可以帮助解决你的问题,因为这是一个序列化错误。 - Felipe
所以,KafkaEventSerializationSchema是您创建的实现了KafkaSerializationSchema接口的类。在Java文档中有这样一个警告:如果您的序列化模式需要关于可用分区和并行子任务数量以及运行Kafka生产者的子任务ID的信息,请还要实现KafkaContextAware接口。 https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/index.html?org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.html 。也许,如果您发布您的类,那么很容易就可以确定错误所在。 - Felipe
但这不是异常所说的。如果 transaction.max.timeout.ms 是个问题,它应该设置为多少呢?如果默认是 15 分钟,并且在尝试运行作业时立即出现此异常,我看不出它怎么可能在几秒钟内超时了。 - Olindholm
@WiggyLindholm,你解决了这个问题吗? - vikash dat
@vikash-dat 不是很确定。我来回进行了许多调整,但仍然无法使其正常工作。异常似乎越来越少,但有时仍会发生。我不知道为什么,我从同事那里得到了一些代码(我不能公开分享),但就我所看到的而言,它们几乎相同,所以我不知道为什么现在它可以工作,或者为什么当时它不能工作。抱歉。也许如果我有时间,我可以尝试获得一个可工作的示例并发布在github上。 - Olindholm
显示剩余4条评论
2个回答

0

我遇到了相同的问题,通过显式设置生产者的超时时间得以解决。 properties.setProperty("transaction.timeout.ms", "900000");


0
以下参数需要启用EXACTLY_ONCE处理:
  • 您的Kafka代理上的transaction.max.timeout.ms应超过预期的最大Flink和/或Kafka停机时间。
  • transaction.timeout.ms的值应小于transaction.max.timeout.ms
例如,
Properties producerProperties = new Properties();
producerProperties.setProperty("transaction.timeout.ms", "60000");

除此之外,您还需要为setTransactionalIdPrefix设置值。这个值必须对于在同一个Kafka集群中运行的每个Flink应用程序都是唯一的。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接