Merge two avro schemas programmatically
Asked Answered
E

2

10

I have two similar schemas where only one nested field changes (it is called onefield in schema1 and anotherfield in schema2).

schema1

{
    "type": "record",
    "name": "event",
    "namespace": "foo",
    "fields": [
        {
            "name": "metadata",
            "type": {
                "type": "record",
                "name": "event",
                "namespace": "foo.metadata",
                "fields": [
                    {
                        "name": "onefield",
                        "type": [
                            "null",
                            "string"
                        ],
                        "default": null
                    }
                ]
            },
            "default": null
        }
    ]
}

schema2

{
    "type": "record",
    "name": "event",
    "namespace": "foo",
    "fields": [
        {
            "name": "metadata",
            "type": {
                "type": "record",
                "name": "event",
                "namespace": "foo.metadata",
                "fields": [
                    {
                        "name": "anotherfield",
                        "type": [
                            "null",
                            "string"
                        ],
                        "default": null
                    }
                ]
            },
            "default": null
        }
    ]
}

I am able to programatically merge both schemas using avro 1.8.0:

Schema s1 = new Schema.Parser().parse(schema1);
Schema s2 = new Schema.Parser().parse(schema2);
Schema[] schemas = {s1, s2};

Schema mergedSchema = null;
for (Schema schema: schemas) {
    mergedSchema = AvroStorageUtils.mergeSchema(mergedSchema, schema);
}

and use it to convert an input json into an avro or json representation:

JsonAvroConverter converter = new JsonAvroConverter();
try {
    byte[] example = new String("{}").getBytes("UTF-8");
    byte[] avro = converter.convertToAvro(example, mergedSchema);
    byte[] json = converter.convertToJson(avro, mergedSchema);
    System.out.println(new String(json));
} catch (AvroConversionException e) {
    e.printStackTrace();
}

That code shows the expected output: {"metadata":{"onefield":null,"anotherfield":null}}. The issue is that I am not able to see the merged schema. If I do a simple System.out.println(mergedSchema) I get the following exception:

Exception in thread "main" org.apache.avro.SchemaParseException: Can't redefine: merged schema (generated by AvroStorage).merged
    at org.apache.avro.Schema$Names.put(Schema.java:1127)
    at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:561)
    at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:689)
    at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:715)
    at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:700)
    at org.apache.avro.Schema.toString(Schema.java:323)
    at org.apache.avro.Schema.toString(Schema.java:313)
    at java.lang.String.valueOf(String.java:2982)
    at java.lang.StringBuilder.append(StringBuilder.java:131)

I call it the avro uncertainty principle :). It looks like avro is able to work with the merged schema, but it fails when it tries to serialize the schema to JSON. The merge works with simpler schemas, so it sounds like a bug in avro 1.8.0 to me.

Do you know what could be happening or how to solve it? Any workaround (ex: alternative Schema serializers) is welcome.

Enzyme answered 10/4, 2016 at 11:56 Comment(1)
It seems to be happening in previous versions of avro (1.7.6) too mail-archives.apache.org/mod_mbox/avro-user/201406.mbox/…Enzyme
I
2

I found the same issue with the pig util class... actually there are 2 bugs here

  • AVRO allows serialize data through GenericDatumWriter using an invalid schema
  • The piggybank util class is generating invalid schemas because it is using the same name/namespace for all the merged fields (instance of keep the original name)

This is working properly for more complex scenarios https://github.com/kite-sdk/kite/blob/master/kite-data/kite-data-core/src/main/java/org/kitesdk/data/spi/SchemaUtil.java#L511

    Schema mergedSchema = SchemaUtil.merge(s1, s2);

From your example, I am getting the following output

{
  "type": "record",
  "name": "event",
  "namespace": "foo",
  "fields": [
    {
      "name": "metadata",
      "type": {
        "type": "record",
        "name": "event",
        "namespace": "foo.metadata",
        "fields": [
          {
            "name": "onefield",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "anotherfield",
            "type": [
              "null",
              "string"
            ],
            "default": null
          }
        ]
      },
      "default": null
    }
  ]
}

Hopefully this will help others.

Imtiaz answered 18/7, 2017 at 17:54 Comment(1)
Thanks @lake. I'm not able to try it, but it looks really good.Enzyme
U
0

Merge schema facility is not ssupported for avro files yet. But lets say if you are having avro files in one directory with multiple avro files which has different schemas eg: /demo so you can read it through spark using. and provide one master schema file (i.e .avsc file) so spark will internally read all the records from the file and if any one file has missing column so it will show null value.

object AvroSchemaEvolution {
def main(args: Array[String]): Unit = {
val schema = new Schema.Parser().parse(new File("C:\\Users\\murtazaz\\Documents\\Avro_Schema_Evolution\\schema\\emp_inserted.avsc"))
val spark = SparkSession.builder().master("local").getOrCreate()
  val df = spark.read
.format("com.databricks.spark.avro").option("avroSchema", schema.toString)
.load("C:\\Users\\murtazaz\\Documents\\Avro_Schema_Evolution\\demo").show()
 }
}
Unripe answered 20/6, 2018 at 5:39 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.