There is a Scala Flink application where I am parsing JSON using Jackson library. Parsing is handled by a custom method and it uses the lazy initiation concept to keep it fast.
Now, for whatever reason, passing model with lazy values further in Flink pipeline causes some strange errors with the util.Iterator
which is the backbone for reading the JSON. I suspect the issue might be actually coming from Kryo
but I don't know how to confirm it. Worth noting, eagerly initializing the model (with .toList
) in the same (flink) map
fixes the issue. But this is not the case, I want to pass my lazy model further.
In the end, I provide a repository with a demo code, but I want provide all the details in the StackOverflow as well.
An example model and parsing definition:
case class Root(items: Collection[Data])
case class Data(data: Collection[Double])
def toRoot(node: JsonNode): Root = {
val data: util.Iterator[JsonNode] = if (node.hasNonNull("items")) node.get("items").elements() else node.elements()
val items: Collection[Data] = data.asScala.map(x => toData(x))
Root(items)
}
and the JSON data is something like:
{
"items": [
{
"data": [
11.71476355252127,
48.342882259940176,
507.3,
11.714791605037252,
...
and doing it all in one map
works:
env.fromCollection(Seq(input))
.map(i => flatten(read(i)))
.print()
but passing it further fails:
env.fromCollection(Seq(input))
.map(i => read(i))
.map(i => flatten(i))
.print()
With the errors:
- Scala 2.11
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:911)
at java.util.ArrayList$Itr.next(ArrayList.java:861)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
- Scala 2.12
Caused by: java.lang.NullPointerException
at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:57)
... 29 more
I have created a demo project with all those examples ready to test with Scala 2.11 and 2.12, because it actually gives different results available HERE
Root(items.toList)
instead? – Ejectmentmap
fixes the issue. This is why the model withList
works. But I don't understand why I can't pass it lazy to all the steps down. I will add this to the question. – Transfigurationmap(i => flatten(read(i)))
is valid syntax, I don't see whymap(i => read(i)).map(i => flatten(i))
could conceptually not be valid. Perhaps it could be compiled to the same thing (so not giving benefits, but also not failing). -- I definitely can imagine that there may be reasons why it is akward to implement, but conceptually it may be fine?! – Galba