Using an Collections$UnmodifiableCollection with Apache Flink
Asked Answered
M

3

10

While using Apache Flink with the following code:

DataStream<List<String>> result = source.window(Time.of(1, TimeUnit.SECONDS)).mapWindow(new WindowMapFunction<String, List<String>>() {

    @Override
    public void mapWindow(Iterable<String> iterable, Collector<List<String>> collector) throws Exception {
        List<String> top5 = Ordering.natural().greatestOf(iterable, 5);
        collector.collect(top5);
    }
}).flatten();

I got this exception

Caused by: java.lang.UnsupportedOperationException
    at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:211)
    at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:110)
    at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:41)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:125)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:127)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:56)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
    at java.lang.Thread.run(Thread.java:745)

How can I ue the UnmodifiableCollection with Flink?

Michaelson answered 8/9, 2015 at 8:38 Comment(0)
M
22

The problem is that the default CollectionSerializer of Kryo can not deserialize the collection again, because its not modifiable (the .add() call fails).

To resolve the issue, we can use the UnmodifiableCollectionsSerializer from the kryo-serializers project. Flink transitively depends on the project, so there is no need to add it as a dependency.

Next, we have to register the serializer with Flink's Kryo instances.

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
see.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);

Usually, we don't have to call Class.forName() for registering a serializer, but in this case, java.util.Collections$UnmodifiableCollection is package visible, so we can not directly access the class.

Michaelson answered 8/9, 2015 at 8:38 Comment(3)
thanks for this insightful and prompt answer. Your answering speed is staggering :-)Tortious
This was so helpful! Thanks Robert!Zoilazoilla
great! it works fine. I meet two unmoficable exception: UnmodifiableCollection and UnmodifiableMapPhotima
I
3

you can try this one since kryo-serializers repo did not change for the last few years.

val env = StreamExecutionEnvironment.getExecutionEnvironment
val avroKryoSerializerUtil = new AvroKryoSerializerUtils
avroKryoSerializerUtil.addAvroSerializersIfRequired(env.getConfig,classOf[GenericData.Record])
Inscrutable answered 18/8, 2021 at 12:39 Comment(0)
P
0

This sounds like a valid answer and can be helpful:

https://lists.apache.org/thread/8f21loz5915dzw8cy2q8c08kxypvj1sq

I would recommend using the AvroSerializer for serializing GenericRecords. You have to add org.apache.flink:flink-avro as a dependency to your job and then tell the system that you would like to use the GenericRecordAvroTypeInfo via

DataStream sourceStream = env.addSource(new AvroGenericSource()) .returns(new GenericRecordAvroTypeInfo(schema));

You can find more information about it here [1].

[1] https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro

Poised answered 12/12, 2023 at 13:58 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.