Spark - Serializing an object with a non-serializable member
Asked Answered
A

1

11

I am going to ask this question in the context of Spark, because that's what I'm facing, but this might be a plain Java issue.

In our spark job, we have a Resolver which needs to be used in all of our workers (it's used in a udf). The problem is that it's not serializable and we cannot change it to be so. The solution was to put it as a member of another class which is serializable.

So we ended up with:

public class Analyzer implements Serializable {
    transient Resolver resolver;

    public Analyzer() {
        System.out.println("Initializing a Resolver...");
        resolver = new Resolver();
    }

    public int resolve(String key) {
         return resolver.find(key);
    }
}

We then broadcast this class using the Spark API:

 val analyzer = sparkContext.broadcast(new Analyzer())

(more information about Spark broadcast can be found here)

We then proceed to use analyzer in a UDF, as part of our spark code, with something like:

val resolve = udf((key: String) => analyzer.value.resolve(key))
val result = myDataFrame.select("key", resolve("key")).count()

This all works as expected, but leaves we wondering.

Resolver does not implement Serializable and is, therefore, marked as transient - meaning it does not get serialized along with it's owner object Analyzer.

But as you can see clearly from the code above, the resolve() method uses resolver, so it must not be null. And indeed it isn't. The code works.

So if the field is not passed through serialization, how is the resolver member instantiated?

My initial thought was that maybe the Analyzer constructor is called on the receiving side (i.e. the spark worker), but then I would expect to see the line "Initializing a Resolver..." printed several times. But it's only printed once, which is probably an indication to the fact that it's only called once, right before it's passed to the broadcast API. So why isn't resolver null?

Am I missing something about JVM serialization or Spark serialization?

How does this code even work?

Spark runs on YARN, in cluster mode. spark.serializer is set to org.apache.spark.serializer.KryoSerializer.

Arterio answered 21/1, 2018 at 19:53 Comment(5)
I don't have a full answer but it looks like this happens as result of some Kryo internal thingy (broadcast variables are serialized with Kryo, if you'd pass variable with closure or use org.apache.spark.serializer.JavaSerializer it would be null as expected).Upbow
by the docs spark.apache.org/docs/latest/tuning.html If you didn't specify serialization in spark context you are using the default java serialization... While kryo just ignore serialization declaration or transient fields (you can instruct it to avoid transient though juanrh.github.io/doc/kryo/apidocs/com/esotericsoftware/kryo/…)Spitball
@Spitball I find your comment puzzling, because: a) we have kryo specifically declared. b) if we remove the transient it fails, and the stack-trace leads to kryoArterio
@Arterio Ups my bad, i missed the last two lines of the question...Spitball
@Arterio Can you add the Kryo stack trace without the transient attribute?Upgrade
U
3

So if the field is not passed through serialization, how is the resolver member instantiated?

It is instantiated via the constructor call (new Resolver), when invoking kryo.readObject:

kryo.readClassAndObject(input).asInstanceOf[T]

My initial thought was that maybe the Analyzer constructor is called on the receiving side (i.e. the spark worker), but then I would expect to see the line "Initializing a Resolver..." printed several times. But it's only printed once, which is probably an indication to the fact that it's only called once

That's not how a broadcast variable works. What happens is that when each Executor needs the broadcast variable in scope, it first checks if it has the object in memory in its BlockManager, if it doesn't, it asks either the driver or the neighbor executors (if there are multiple executors on the same Worker node) for their cached instance, and they serialize it and send it to him, and in turn he receives the instance and caches it inside his own BlockManager.

This is documented in the behavior of the TorrentBroadcast (which is the default broadcasting implementation):

* The driver divides the serialized object into small chunks and
* stores those chunks in the BlockManager of the driver.
*
* On each executor, the executor first attempts to fetch the object from its BlockManager. If
* it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
* other executors if available. Once it gets the chunks, it puts the chunks in its own
* BlockManager, ready for other executors to fetch from.
*
* This prevents the driver from being the bottleneck in sending out multiple copies of the
* broadcast data (one per executor).

if we remove the transient it fails, and the stack-trace leads to Kryo

That is because there is probably a field inside your Resolver class which even Kryo is unable to serialize, regardless of the Serializable attribute.

Upgrade answered 22/1, 2018 at 15:38 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.