Problem with Java Iterator created by Jackson breaks Scala Flink application
Asked Answered
T

0

7

There is a Scala Flink application where I am parsing JSON using Jackson library. Parsing is handled by a custom method and it uses the lazy initiation concept to keep it fast.

Now, for whatever reason, passing model with lazy values further in Flink pipeline causes some strange errors with the util.Iterator which is the backbone for reading the JSON. I suspect the issue might be actually coming from Kryo but I don't know how to confirm it. Worth noting, eagerly initializing the model (with .toList) in the same (flink) map fixes the issue. But this is not the case, I want to pass my lazy model further.

In the end, I provide a repository with a demo code, but I want provide all the details in the StackOverflow as well.

An example model and parsing definition:

case class Root(items: Collection[Data])
case class Data(data: Collection[Double])

def toRoot(node: JsonNode): Root = {
    val data: util.Iterator[JsonNode] = if (node.hasNonNull("items")) node.get("items").elements() else node.elements()
    val items: Collection[Data] = data.asScala.map(x => toData(x))
    Root(items)
}

and the JSON data is something like:

{
  "items": [
    {
      "data": [
        11.71476355252127,
        48.342882259940176,
        507.3,
        11.714791605037252,
        ...

and doing it all in one map works:

env.fromCollection(Seq(input))
   .map(i => flatten(read(i)))
   .print()

but passing it further fails:

env.fromCollection(Seq(input))
   .map(i => read(i))
   .map(i => flatten(i))
   .print()

With the errors:

  • Scala 2.11
Caused by: java.util.ConcurrentModificationException
    at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:911)
    at java.util.ArrayList$Itr.next(ArrayList.java:861)
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
  • Scala 2.12
Caused by: java.lang.NullPointerException
    at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80)
    at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:57)
    ... 29 more

I have created a demo project with all those examples ready to test with Scala 2.11 and 2.12, because it actually gives different results available HERE

Transfiguration answered 21/1, 2021 at 12:3 Comment(6)
Can you try with Root(items.toList) instead?Ejectment
I did specify this in my repo but forgot to mention here. Initializing the list in the same map fixes the issue. This is why the model with List works. But I don't understand why I can't pass it lazy to all the steps down. I will add this to the question.Transfiguration
My wild guess would be that List is reliably serializable because this is just a data structure containing evaluated values. Iterator "creates" values as it goes and it might contain functions/closures/connection to DB/other state that cannot be reliably serialized and deserialized. And you do serialize things when you use distributed computing with Spark/FlinkInternationale
I am aware of serialization, that is why I pointed to Kryo in my question. Do you have any idea how to make it work using lazy values than :-)?Transfiguration
@Transfiguration how do you expect such an approach to work in general? "lazy value" is a function. Its structure is opaque to the serialization library. To serialize it, it has to either materialize the value (so no more laziness), try to serialize it as jvm bytecode (so no more efficiency) or crash.Negativism
Perhaps I am underthinking this, but if map(i => flatten(read(i))) is valid syntax, I don't see why map(i => read(i)).map(i => flatten(i)) could conceptually not be valid. Perhaps it could be compiled to the same thing (so not giving benefits, but also not failing). -- I definitely can imagine that there may be reasons why it is akward to implement, but conceptually it may be fine?!Galba

© 2022 - 2024 — McMap. All rights reserved.