Problem description
We have a Hadoop cluster on which we store data which is serialized to bytes using Kryo (a serialization framework). The Kryo version which we used to do this has been forked from the official release 2.21 to apply our own patches to issues we have experienced using Kryo. The current Kryo version 2.22 also fixes these issues, but with different solutions. As a result, we cannot just change the Kryo version we use, because this would mean that we would no longer be able to read the data which is already stored on our Hadoop cluster. To address this problem, we want to run a Hadoop job which
- reads the stored data
- deserializes the data stored with the old version of Kryo
- serializes the restored objects with the new version of Kryo
- writes the new serialized representation back to our data store
The problem is that it is not trivial to use two different versions of the same class in one Java program (more precisely, in a Hadoop job's mapper class).
Question in a nutshell
How is it possible to deserialize and serialize an object with two different versions of the same serialization framework in one Hadoop job?
Relevant facts overview
- We have data stored on a Hadoop CDH4 cluster, serialized with a Kryo version 2.21.2-ourpatchbranch
- We want to have the data serialized with Kryo version 2.22, which is incompatible to our version
- We build our Hadoop job JARs with Apache Maven
Possible (and impossible) approaches
(1) Renaming packages
The first approach which has come to our minds was to rename the packages in our own Kryo branch using the relocation functionality of the Maven Shade plugin and release it with a different artifact ID so we could depend on both artifacts in our conversion job project. We would then instantiate one Kryo object of both the old and the new version and use the old one for deserialization and the new one for serializing the object again.
Problems
We don't use Kryo explicitly in Hadoop jobs, but rather access it through multiple layers of our own libraries. For each of these libraries, it would be necessary to
- rename involved packages and
- create a release with a different group or artifact ID
To make things even more messy, we also use Kryo serializers provided by other 3rd party libraries for which we would have to do the same thing.
(2) Using multiple class loaders
The second approach we came up with was to not depend on Kryo at all in the Maven project which contains the conversion job, but load the required classes from a JAR for each version, which is stored in Hadoop's distributed cache. Serializing an object would then look something like this:
public byte[] serialize(Object foo, JarClassLoader cl) {
final Class<?> kryoClass = cl.loadClass("com.esotericsoftware.kryo.Kryo");
Object k = kryoClass.getConstructor().newInstance();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
final Class<?> outputClass = cl.loadClass("com.esotericsoftware.kryo.io.Output");
Object output = outputClass.getConstructor(OutputStream.class).newInstance(baos);
Method writeObject = kryoClass.getMethod("writeObject", outputClass, Object.class);
writeObject.invoke(k, output, foo);
outputClass.getMethod("close").invoke(output);
baos.close();
byte[] bytes = baos.toByteArray();
return bytes;
}
Problems
Though this approach might work to instantiate an unconfigured Kryo object and serialize / restore some object, we use a much more complex Kryo configuration. This includes several custom serializers, registered class ids et cetera. We were for example unable to figure out a way to set custom serializers for classes without getting a NoClassDefFoundError - the following code does not work:
Class<?> kryoClass = this.loadClass("com.esotericsoftware.kryo.Kryo");
Object kryo = kryoClass.getConstructor().newInstance();
Method addDefaultSerializer = kryoClass.getMethod("addDefaultSerializer", Class.class, Class.class);
addDefaultSerializer.invoke(kryo, URI.class, URISerializer.class); // throws NoClassDefFoundError
The last line throws a
java.lang.NoClassDefFoundError: com/esotericsoftware/kryo/Serializer
because the URISerializer
class references Kryo's Serializer
class and tries to load it using its own class loader (which is the System class loader), which does not know the Serializer
class.
(3) Using an intermediate serialization
Currently the most promising approach seems to be using an independant intermediate serialization, e.g. JSON using Gson or alike, and then running two separate jobs:
- kryo:2.21.2-ourpatchbranch in our regular store -> JSON in a temporary store
- JSON in the temporary store -> kryo:2-22 in our regular store
Problems
The biggest problem with this solution is the fact that it roughly doubles the space consumption of the data processed. Moreover, we need another serialization method which works without problems on all of our data, which we would need to investigate first.
kyro.addDefaultSerializer(URI.class, URISerializer.class)
directly? Why use reflection? And which class causes theNoClassDefFoundError
? – GoosyKryo
type was not known at compile-time. – Haggi