使用Apache Flink时如何使用Collections$UnmodifiableCollection?

10

使用下面的代码与 Apache Flink 一起使用:

DataStream<List<String>> result = source.window(Time.of(1, TimeUnit.SECONDS)).mapWindow(new WindowMapFunction<String, List<String>>() {

    @Override
    public void mapWindow(Iterable<String> iterable, Collector<List<String>> collector) throws Exception {
        List<String> top5 = Ordering.natural().greatestOf(iterable, 5);
        collector.collect(top5);
    }
}).flatten();

我收到了这个异常

Caused by: java.lang.UnsupportedOperationException
    at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:211)
    at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:110)
    at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:41)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:125)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:127)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:56)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
    at java.lang.Thread.run(Thread.java:745)

我该如何在Flink中使用UnmodifiableCollection

2个回答

21
问题在于Kryo的默认CollectionSerializer不能再次反序列化集合,因为它是不可修改的(.add()调用失败)。为了解决这个问题,我们可以使用kryo-serializers项目中的UnmodifiableCollectionsSerializer。Flink传递依赖于该项目,所以无需将其添加为依赖项。接下来,我们必须向Flink的Kryo实例注册该序列化程序。
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
see.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);

通常情况下,我们不需要调用Class.forName()来注册一个序列化程序。但在这种情况下,java.util.Collections$UnmodifiableCollection是包可见的,因此我们无法直接访问该类。

4
谢谢你这个深入而及时的回答。你的回答速度令人惊叹 :-) - Till Rohrmann
这非常有帮助!谢谢罗伯特! - jatinpreet
太好了!它运行得很好。我遇到了两个不可修改的异常:UnmodifiableCollection和UnmodifiableMap。 - 何德福

3

由于kryo-serializers代码库在过去几年中未发生改变,因此您可以尝试使用这个。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val avroKryoSerializerUtil = new AvroKryoSerializerUtils
avroKryoSerializerUtil.addAvroSerializersIfRequired(env.getConfig,classOf[GenericData.Record])

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接