Writing protobuf object in parquet using apache beam
Asked Answered
W

1

1

I fetch protobuf data from google pub/sub and deserialize the data to Message type object. So i get PCollection<Message> type object. Here is sample code:

public class ProcessPubsubMessage extends DoFn<PubsubMessage, Message> {

    @ProcessElement
    public void processElement(@Element PubsubMessage element, OutputReceiver<Message> receiver) {

        byte[] payload = element.getPayload();
        try {
            Message message = Message.parseFrom(payload);
            receiver.output(message);
        } catch (InvalidProtocolBufferException e) {
            LOG.error("Got exception while parsing message from pubsub. Exception =>" + e.getMessage());
        }

    }
}
PCollection<Message> event = psMessage.apply("Parsing data from pubsub message",
                ParDo.of(new ProcessPubsubMessage()));

I want to apply transformation on PCollection<Message> eventto write in parquet format. I know apache beam has provided ParquetIO but it works fine for PCollection<GenericRecord> type and conversion from Message to GenericRecord may solve the problem (Yet don't know how to do that). There is any easy way to write in parquet format ?

Whoopee answered 18/9, 2018 at 9:4 Comment(0)
W
0

It can be solved by using the following library :

<dependency>
     <groupId>org.apache.avro</groupId>
     <artifactId>avro-protobuf</artifactId>
     <version>1.7.7</version>
</dependency>

private GenericRecord getGenericRecord(Event event) throws IOException {
    ProtobufDatumWriter<Event> datumWriter = new ProtobufDatumWriter<Event>(Event.class);
    ByteArrayOutputStream os = new ByteArrayOutputStream();
    Encoder e = EncoderFactory.get().binaryEncoder(os, null);
    datumWriter.write(event, e);
    e.flush();

    ProtobufDatumReader<Event> datumReader = new ProtobufDatumReader<Event>(Event.class);
    GenericDatumReader<GenericRecord> genericDatumReader = new GenericDatumReader<GenericRecord>(datumReader.getSchema());
    GenericRecord record = genericDatumReader.read(null, DecoderFactory.get().binaryDecoder(new ByteArrayInputStream(os.toByteArray()), null));
    return record;
}

For details: https://gist.github.com/alexvictoor/1d3937f502c60318071f

Whoopee answered 27/9, 2018 at 18:11 Comment(2)
Thanks for sharing your answer! I'm trying to do the exact same thing, but I seem to still have an issue as I need to explicitly pass an AvroCoder, which requires the message schema. How did you address this issue? I couldn't find it in your gist.Retinitis
This is one way to get the schema for an AvroCoder: gist.github.com/alexvictoor/… In other words, if you have a protobuf-generated class, during pipeline construction you can create a ProtobufDatumReader and get the schema from it.Aubert

© 2022 - 2024 — McMap. All rights reserved.