How to convert Avro GenericRecord to a valid Json using while coverting timestamp fields from milliseconds to datetime?
Asked Answered
F

2

7

How to convert Avro GenericRecord to Json using while coverting timestamp fields from milliseconds to datetime?

Currently using Avro 1.8.2

    Timestamp tsp = new Timestamp(1530228588182l);
    Schema schema  = SchemaBuilder.builder()
            .record("hello")
            .fields()
            .name("tsp").type(LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))).noDefault()
            .endRecord();
    System.out.println(schema.toString());

    GenericRecord genericRecord = new GenericData.Record(schema);
    genericRecord.put("tsp",tsp.getTime()); //Assume I cannot change this
    System.out.println(genericRecord.toString());

I tried using the function below but the result is same as genericrecord.toString()

public static String toJsonString(Schema schema, GenericRecord genericRecord) throws IOException {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema);
    writer.getData().addLogicalTypeConversion(new TimeConversions.TimestampConversion());
    JsonEncoder encoder = EncoderFactory.get().jsonEncoder(schema, baos, false);
    writer.write(genericRecord, encoder);
    encoder.flush();
    return baos.toString();
}

Third Attempt

public static GenericRecord deserialize(final Schema schema, byte[] data) throws IOException {
        final GenericData genericData = new GenericData(){
            @Override
            public String toString(Object datum) {
                StringBuilder buffer = new StringBuilder();
                // Since these types are not quoted and produce a malformed JSON string, quote it here.
                if (datum instanceof java.sql.Timestamp || datum instanceof java.sql.Time || datum instanceof java.sql.Date) {
                    return buffer.append("\"").append(datum).append("\"").toString();
                }
                return super.toString(datum);
            }
        };
        genericData.addLogicalTypeConversion(new TimeConversions.TimestampConversion());
        genericData.addLogicalTypeConversion(new TimeConversions.TimeConversion());
        try (final InputStream is = new ByteArrayInputStream(data)) {
            final Decoder decoder = DecoderFactory.get().binaryDecoder(is, null);
            final DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema, schema, genericData);
            return reader.read(null, decoder);
        }
    }

Schema

{"type":"record","name":"tsp_name","fields":[{"name":"tsp","type":{"type":"long","logicalType":"timestamp-millis"}}]}

Current Output

{"tsp":2018-06-28T23:29:48.182Z} // missing quotes so not a valid json

Expected Output

{"tsp": "2018-06-28T23:29:48.182Z"}
Fino answered 23/8, 2018 at 9:21 Comment(1)
just edit my previous answer.Germinate
G
16

To change the projection you can extend the conversion to return a string for timestamp-millis logical type. The following code result in your expected output

import org.apache.avro.*;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.*;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Timestamp;

public class Main5 {
    public static void main(String [] args ) throws IOException {
        Timestamp tsp = new Timestamp(1530228588182L);
        String strSchema = "{\"type\":\"record\",\"name\":\"tsp_name\",\"fields\":[{\"name\":\"tsp\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}}]}\n";
        Schema schema = new Schema.Parser().parse(strSchema);
        System.out.println(new DateTime(tsp.getTime(), DateTimeZone.UTC));
        GenericRecord genericRecord = new GenericData.Record(schema);
        genericRecord.put("tsp",tsp.getTime()); //Assume I cannot change this
        System.out.println(genericRecord);
        System.out.println(deserialize(schema, toByteArray(schema , genericRecord)));
    }

    public static byte [] toByteArray(Schema schema, GenericRecord genericRecord) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema);
        writer.getData().addLogicalTypeConversion(new TimeConversions.TimestampConversion());
        BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(baos, null);
        writer.write(genericRecord, encoder);
        encoder.flush();
        return baos.toByteArray();
    }


    public static GenericRecord deserialize(Schema schema, byte[] data) throws IOException {
        final GenericData genericData = new GenericData();
        genericData.addLogicalTypeConversion(new MyTimestampConversion());
        InputStream is = new ByteArrayInputStream(data);
        Decoder decoder = DecoderFactory.get().binaryDecoder(is, null);
        DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema, schema, genericData);
        return reader.read(null, decoder);
    }

    public static class MyTimestampConversion extends Conversion<String> {
        public MyTimestampConversion() {
        }

        public Class<String> getConvertedType() {
            return String.class;
        }

        public String getLogicalTypeName() {
            return "timestamp-millis";
        }

        public String fromLong(Long millisFromEpoch, Schema schema, LogicalType type) {
            return (new DateTime(millisFromEpoch, DateTimeZone.UTC)).toString();
        }

        public Long toLong(String timestamp, Schema schema, LogicalType type) {
            return new Long(timestamp);
        }
    }
}

Output {"tsp": "2018-06-28T23:29:48.182Z"} `

Germinate answered 27/8, 2018 at 14:6 Comment(1)
You can also use a Json encoder/decoder: JsonDecoder decoder = DecoderFactory.get().jsonDecoder(schema, is)Shive
A
0

The problem with the first attempt is that the LogicalType information never leaves the schema, the GenericRecord just sees a long.

I suspect the 2nd attempt fails because it's writing json to the avro format, looking at GenericDatumWriter the conversion is to write the base type (this is the same thing the avro-tools do when dumping data.)

Maybe you need a special case decoder to transform the timestamp to a string formatted the way you want?

Antiparticle answered 23/8, 2018 at 13:32 Comment(1)
I gave a third attempt. Now it looks so close but I am missing quotes so it is not a valid json. any suggestions?Fino

© 2022 - 2024 — McMap. All rights reserved.