Dealing with an incompatible version change of a serialization framework
Asked Answered
H

3

17

Problem description

We have a Hadoop cluster on which we store data which is serialized to bytes using Kryo (a serialization framework). The Kryo version which we used to do this has been forked from the official release 2.21 to apply our own patches to issues we have experienced using Kryo. The current Kryo version 2.22 also fixes these issues, but with different solutions. As a result, we cannot just change the Kryo version we use, because this would mean that we would no longer be able to read the data which is already stored on our Hadoop cluster. To address this problem, we want to run a Hadoop job which

  1. reads the stored data
  2. deserializes the data stored with the old version of Kryo
  3. serializes the restored objects with the new version of Kryo
  4. writes the new serialized representation back to our data store

The problem is that it is not trivial to use two different versions of the same class in one Java program (more precisely, in a Hadoop job's mapper class).

Question in a nutshell

How is it possible to deserialize and serialize an object with two different versions of the same serialization framework in one Hadoop job?

Relevant facts overview

  • We have data stored on a Hadoop CDH4 cluster, serialized with a Kryo version 2.21.2-ourpatchbranch
  • We want to have the data serialized with Kryo version 2.22, which is incompatible to our version
  • We build our Hadoop job JARs with Apache Maven

Possible (and impossible) approaches

(1) Renaming packages

The first approach which has come to our minds was to rename the packages in our own Kryo branch using the relocation functionality of the Maven Shade plugin and release it with a different artifact ID so we could depend on both artifacts in our conversion job project. We would then instantiate one Kryo object of both the old and the new version and use the old one for deserialization and the new one for serializing the object again.

Problems
We don't use Kryo explicitly in Hadoop jobs, but rather access it through multiple layers of our own libraries. For each of these libraries, it would be necessary to

  1. rename involved packages and
  2. create a release with a different group or artifact ID

To make things even more messy, we also use Kryo serializers provided by other 3rd party libraries for which we would have to do the same thing.


(2) Using multiple class loaders

The second approach we came up with was to not depend on Kryo at all in the Maven project which contains the conversion job, but load the required classes from a JAR for each version, which is stored in Hadoop's distributed cache. Serializing an object would then look something like this:

public byte[] serialize(Object foo, JarClassLoader cl) {
    final Class<?> kryoClass = cl.loadClass("com.esotericsoftware.kryo.Kryo");
    Object k = kryoClass.getConstructor().newInstance();
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    final Class<?> outputClass = cl.loadClass("com.esotericsoftware.kryo.io.Output");

    Object output = outputClass.getConstructor(OutputStream.class).newInstance(baos);
    Method writeObject = kryoClass.getMethod("writeObject", outputClass, Object.class);
    writeObject.invoke(k, output, foo);
    outputClass.getMethod("close").invoke(output);
    baos.close();
    byte[] bytes = baos.toByteArray();
    return bytes;
}

Problems
Though this approach might work to instantiate an unconfigured Kryo object and serialize / restore some object, we use a much more complex Kryo configuration. This includes several custom serializers, registered class ids et cetera. We were for example unable to figure out a way to set custom serializers for classes without getting a NoClassDefFoundError - the following code does not work:

Class<?> kryoClass = this.loadClass("com.esotericsoftware.kryo.Kryo");
Object kryo = kryoClass.getConstructor().newInstance();
Method addDefaultSerializer = kryoClass.getMethod("addDefaultSerializer", Class.class, Class.class);
addDefaultSerializer.invoke(kryo, URI.class, URISerializer.class); // throws NoClassDefFoundError

The last line throws a

java.lang.NoClassDefFoundError: com/esotericsoftware/kryo/Serializer

because the URISerializer class references Kryo's Serializer class and tries to load it using its own class loader (which is the System class loader), which does not know the Serializer class.


(3) Using an intermediate serialization

Currently the most promising approach seems to be using an independant intermediate serialization, e.g. JSON using Gson or alike, and then running two separate jobs:

  1. kryo:2.21.2-ourpatchbranch in our regular store -> JSON in a temporary store
  2. JSON in the temporary store -> kryo:2-22 in our regular store

Problems
The biggest problem with this solution is the fact that it roughly doubles the space consumption of the data processed. Moreover, we need another serialization method which works without problems on all of our data, which we would need to investigate first.

Houdan answered 18/4, 2013 at 12:41 Comment(7)
I'd have recommended the 3rd option at first glance. Do you have a version number or something in your payload?Goosy
Concerning (2): why aren't you calling kyro.addDefaultSerializer(URI.class, URISerializer.class) directly? Why use reflection? And which class causes the NoClassDefFoundError?Goosy
@Goosy No, we don't have a version number in our stored data. Concerning (2): I extended the description of the problem in the question. At this point I used reflection, because the test project did not depend on the Kryo artifact and therefore, the Kryo type was not known at compile-time.Haggi
So where exactly is the huge problem? Create two MR jobs, add your custom Kryo the the classpath for the first one that dumps Kryo -> SequenceFile or smthn else that the new Kryo can then pick up and use the second job with the new Kryo version in the classpath to dump to the new Kryo format...Liddy
@Liddy The problems with this approach are outlined in (3) in the question - we are dealing with huge amounts of data and this approach consumes the double amount of time and space than an optimal solution would. Additionally, we need another serialization format besides Kryo for writing the objects to files.Haggi
@MichaelSchmeißer: so your core problem is how to read the old format and write the new - why not create another version of kryo that can read your old format and patch it as necessary from the new kryo to write the new? Without knowing the internals of kryo, I don't know if that would be a ludicrous proposition. But, if it is doable, it would allow you to avoid package renaming issues, multiple classloader fun and temporary, duplicate storage in some other serialized format.Ology
@Ology This would be really tricky and time-consuming to do, because we would potentially have to look at the diff of both versions and a lot of things can go wrong - Kryo is a quite complex library (as I guess all serialization frameworks are). If, however, it would be less complex, I gave your idea a shot.Haggi
U
7

I would use the multiple classloaders approach.

(Package renaming will also work. It does seem ugly, but this is a one-off hack so beauty and correctness can take a back seat. Intermediate serialization seems risky - there was a reason you are using Kryo, and that reason will be negated by using a different intermediate form).

The overall design would be:

child classloaders:      Old Kryo     New Kryo   <-- both with simple wrappers
                                \       /
                                 \     /
                                  \   /
                                   \ /
                                    |
default classloader:    domain model; controller for the re-serialization
  1. Load the domain object classes in the default classloader
  2. Load a Jar with the modified Kryo version and wrapper code. The wrapper has a static 'main' method with one argument: The name of the file to deserialize. Call the main method via reflection from the default classloader:

        Class deserializer = deserializerClassLoader.loadClass("com.example.deserializer.Main");
        Method mainIn = deserializer.getMethod("main", String.class);
        Object graph = mainIn.invoke(null, "/path/to/input/file");
    
    1. This method:
      1. Deserializes the file as one object graph
      2. Places the object into a shared space. ThreadLocal is a simple way, or returning it to the wrapper script.
  3. When the call returns, load a second Jar with the new serialization framework with a simple wrapper. The wrapper has a static 'main' method and an argument to pass the name of the file to serialize in. Call the main method via reflection from the default classloader:

        Class serializer = deserializerClassLoader.loadClass("com.example.serializer.Main");
        Method mainOut = deserializer.getMethod("main", Object.class, String.class);
        mainOut.invoke(null, graph, "/path/to/output/file");
    
    1. This method
      1. Retrieves the object from the ThreadLocal
      2. Serializes the object and writes it to the file

Considerations

In the code fragments, one classloader is created for each object serialization and deserialization. You probably want to load the classloaders only once, discover the main methods and loop over the files, something like:

for (String file: files) {
    Object graph = mainIn.invoke(null, file + ".in");
    mainOut.invoke(null, graph, file + ".out");
}

Do the domain objects have any reference to any Kryo class? If so, you have difficulties:

  1. If the reference is just a class reference, eg to call a method, then the first use of the class will load one of the two Kryo versions into the default classloader. This probably will cause problems as part of the serialization or deserialization might be performed by the wrong version of Kryo
  2. If the reference is used to instantiate any Kryo objects and store the reference in the domain model (class or instance members), then Kryo will actually be serializing part of itself in the model. This may be a deal-breaker for this approach.

In either case, your first approach should be to examine these references and eliminate them. One approach to ensure that you have done this is to ensure the default classloader does not have access to any Kryo version. If the domain objects reference Kryo in any way, the reference will fail (with a ClassNotFoundError if the class is referenced directly or ClassNotFoundException if reflection is used).

Upas answered 24/4, 2013 at 0:36 Comment(0)
B
1

For 2, you can create two jar files that contain the serializer and all the dependencies for the new and old versions of your serializer as shown here. Then create a map reduce job that loads each version of your code in a separate class loader, and add some glue code in the middle which deserializes with the old code, then serializes with the new code.

You will have to be careful that your domain object is loaded in the same class loader as your glue code, and the code to serialize/deserialize depends on the same class loader as your glue code so that they both see the same domain object class.

Barracks answered 23/4, 2013 at 5:8 Comment(2)
Would you suggest the "glue code" to use both implementations via reflection as shown in my code example or is there any better way?Haggi
You will need to use reflection as you are doing now for the glue code, as you can't access the class directly, since they will be loaded in a different class loader. You can probably get the reflection down to one method in each jar.Barracks
G
1

The most easiest way I would come up without thinking is using an additional Java application doing the transformation for you. So you send the binary data to the secondary java application (simple local sockets would do the trick nicely) so you do not have to fiddle with classloaders or packages.

The only thing to think about is the intermediate representation. You might want to use another serialization mechanism or if time is no issue you might want to use the internal serialization of Java.

Using a second Java application saves you from dealing with a temporary storage and do everything in memory.

And once you have those sockets + second application code you find tons of situations where this comes handy.

Also one can build a local cluster using jGroups and save the hassle with sockets after all. jGroups is the most simply communication API I know off. Just form a logical channel and check who joins. And best it even works within the same JVM which makes testing easy and if done remotely one can bind different physical server together just the same way it would work for local applications.

Another variable alternative is using ZeroMQ with its ipc (inter process communication) protocol.

Gaiser answered 4/7, 2015 at 19:16 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.