Apache Flink:由TupleSerializer引起的空指针异常

8
当我执行Flink应用程序时,它会给我一个NullPointerException: 的错误提示。
 2017-08-08 13:21:57,690 INFO com.datastax.driver.core.Cluster  - New  Cassandra host /127.0.0.1:9042 added 
 2017-08-08 13:22:02,427 INFO  org.apache.flink.runtime.taskmanager.Task                     - TriggerWindow(TumblingEventTimeWindows(30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@15d1c80b}, EventTimeTrigger(), WindowedStream.apply(CoGroupedStreams.java:302)) -> Filter -> Flat Map -> Sink: Cassandra Sink (1/1) (092a7ef50209f7a050d9d82be1e03d80) switched from RUNNING to FAILED.
java.lang.RuntimeException: Exception occurred while processing valve output watermark:
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
    at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
    at org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:400)
    at org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:682)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:43)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:31)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:597)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:504)
    at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
    at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
    ... 7 more
Caused by: java.lang.NullPointerException
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)
    at          org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
    ... 22 more
1个回答

3

Flink的元组序列化程序不支持空值。您应该检查元组是否包含空字段。

替代元组的选择是POJO或Row类型。Row支持任意数量的可空字段,但需要明确的类型说明。


为什么它们不支持空值?我们能让它们支持吗? - Ori Popowski
这是一项设计决策,旨在提高序列化/反序列化的效率。添加 null 支持意味着要添加一个位向量来指示空字段,从而破坏二进制格式的兼容性。如果需要 null 支持,可以使用 Row 类型。 - Fabian Hueske

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接