Apache Beam Coder for GenericRecord
Asked Answered
D

2

7

I am building a pipeline that reads Avro generic records. To pass GenericRecord between stages I need to register AvroCoder. The documentation says that if I use generic record, the schema argument can be arbitrary: https://beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/coders/AvroCoder.html#of-java.lang.Class-org.apache.avro.Schema-

However, when I pass an empty schema to the method AvroCoder.of(Class, Schema) it throws an exception at run time. Is there a way to create an AvroCoder for GenericRecord that does not require a schema? In my case, each GenericRecord has an embedded schema.

The exception and stacktrace:

Exception in thread "main" java.lang.NullPointerException
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.checkIndexedRecord(AvroCoder.java:562)
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.recurse(AvroCoder.java:430)
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.check(AvroCoder.java:409)
at org.apache.beam.sdk.coders.AvroCoder.<init>(AvroCoder.java:260)
at org.apache.beam.sdk.coders.AvroCoder.of(AvroCoder.java:141)
Dotson answered 13/12, 2018 at 14:53 Comment(2)
What is the exception that is thrown?Earthshine
@KennKnowles Exception in thread "main" java.lang.NullPointerException at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.checkIndexedRecord(AvroCoder.java:562) at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.recurse(AvroCoder.java:430) at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.check(AvroCoder.java:409) at org.apache.beam.sdk.coders.AvroCoder.<init>(AvroCoder.java:260) at org.apache.beam.sdk.coders.AvroCoder.of(AvroCoder.java:141)Dotson
D
4

I had a similar case and solved it with custom coder. The simplest (but sub-efficient) solution would be to encode schema along with each record. If your schemas are not too volatile you can get benefit of caching.

public class GenericRecordCoder extends AtomicCoder<GenericRecord> {
    public static GenericRecordCoder of() {
        return new GenericRecordCoder();
    }
    private static final ConcurrentHashMap<String, AvroCoder<GenericRecord>> avroCoders = new ConcurrentHashMap<>();

    @Override
    public void encode(GenericRecord value, OutputStream outStream) throws IOException {
        String schemaString = value.getSchema().toString();
        String schemaHash = getHash(schemaString);
        StringUtf8Coder.of().encode(schemaString, outStream);
        StringUtf8Coder.of().encode(schemaHash, outStream);
        AvroCoder<GenericRecord> coder = avroCoders.computeIfAbsent(schemaHash, 
            s -> AvroCoder.of(value.getSchema()));
        coder.encode(value, outStream);
    }

    @Override
    public GenericRecord decode(InputStream inStream) throws IOException {
        String schemaString = StringUtf8Coder.of().decode(inStream);
        String schemaHash = StringUtf8Coder.of().decode(inStream);
        AvroCoder<GenericRecord> coder = avroCoders.computeIfAbsent(schemaHash, 
             s -> AvroCoder.of(new Schema.Parser().parse(schemaString)));
        return coder.decode(inStream);
    }
}

While this solves the task, in fact I made it slightly different, using external schema registry (you can build this on the top of datastore for example). In this case you don't need to serialize/deserialize schema. The code looks like:

public class GenericRecordCoder extends AtomicCoder<GenericRecord> {
    public static GenericRecordCoder of() {
        return new GenericRecordCoder();
    }
    private static final ConcurrentHashMap<String, AvroCoder<GenericRecord>> avroCoders = new ConcurrentHashMap<>();

    @Override
    public void encode(GenericRecord value, OutputStream outStream) throws IOException {
        SchemaRegistry.registerIfAbsent(value.getSchema());
        String schemaName = value.getSchema().getFullName();
        StringUtf8Coder.of().encode(schemaName, outStream);
        AvroCoder<GenericRecord> coder = avroCoders.computeIfAbsent(schemaName, 
            s -> AvroCoder.of(value.getSchema()));
        coder.encode(value, outStream);
    }

    @Override
    public GenericRecord decode(InputStream inStream) throws IOException {
        String schemaName = StringUtf8Coder.of().decode(inStream);
        AvroCoder<GenericRecord> coder = avroCoders.computeIfAbsent(schemaName, 
             s -> AvroCoder.of(SchemaRegistry.get(schemaName)));
        return coder.decode(inStream);
    }
}

The usage is pretty straightforward:

PCollection<GenericRecord> inputCollection = pipeline
    .apply(AvroIO
           .parseGenericRecords(t -> t)
           .withCoder(GenericRecordCoder.of())
           .from(...));
Duteous answered 27/9, 2019 at 20:1 Comment(0)
L
1

After reviewing the code for AvroCoder, I do not think the documentation is correct there. Your AvroCoder instance will need a way to figure out the schema for your Avro records - and likely the only way to do that is by providing one.

So, I'd recommend calling AvroCoder.of(GenericRecord.class, schema), where schema is the correct schema for the GenericRecord objects in your PCollection.

Loganloganberry answered 25/9, 2019 at 0:17 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.