Converting byte array to Json giving avro Schema as input is giving an error
Asked Answered
O

2

3

I have a simple JSON

  String jsonPayload = "{\"empid\": \"6\",\"empname\": \"Saurabh\",\"address\": \"home\"}";
    jsonPayload.getBytes();

I created avro schema

{"namespace": "sample.namespace",
 "type": "record",
 "name": "Employee",
 "fields": [
     {"name": "empid", "type": "string"},
     {"name": "empname",  "type": "string"},
     {"name": "address", "type": "string"}
 ]
}

When I try to compare them I get an error Exception :

org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -62
    at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:430)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)

Looks like there is problem with String and Charsequence identification. Not able to identify exact problem

bytearraytojson converter method code

public String byteArrayToJson(byte[] avro, Schema schema) throws IOException {
        boolean pretty = false;
        GenericDatumReader<GenericRecord> reader = null;
        JsonEncoder encoder = null;
        ByteArrayOutputStream output = null;
        try {

            reader = new GenericDatumReader<GenericRecord>(schema);
            InputStream input = new ByteArrayInputStream(avro);
            output = new ByteArrayOutputStream();
            DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
            encoder = EncoderFactory.get().jsonEncoder(schema, output, pretty);
            Decoder decoder = DecoderFactory.get().binaryDecoder(input, null);
            GenericRecord datum;
            while (true) {
                try {
                    datum = reader.read(null, decoder);
                } catch (EOFException eofe) {
                    break;
                }
                writer.write(datum, encoder);
            }
            encoder.flush();
            output.flush();
            return new String(output.toByteArray());
        } finally {
            try {
                if (output != null) output.close();
            } catch (Exception e) {
            }
        }
    }
Oliana answered 18/12, 2015 at 7:6 Comment(2)
Did you find any solution? @Karthik, Actually I am getting same error.Simeon
@Simeon nope. still waiting for a correct fix.Oliana
C
3

Your problem is that the avro has the schema included.

If you want to read the avro you should to use other DataReader, DataFileReader

Here is a example that how read an avro in byte[] format with schema

Scala example:

def deserializeGenericWithSchema(message: Array[Byte]): Seq[GenericRecord] = {
  val reader: DatumReader[GenericRecord] = new SpecificDatumReader[GenericRecord]()
  val fileReader = new DataFileReader(new SeekableByteArrayInput(message),reader)
  extractRec(fileReader,Seq.empty[GenericRecord])
}

@tailrec
def extractRec(fileReader: DataFileReader[GenericRecord], acc: Seq[GenericRecord]):Seq[GenericRecord] = {
  if (fileReader.hasNext) {
    val newElement = fileReader.next
    extractRec(fileReader,acc :+ newElement)
  } else {
    acc
  }
}

Java example:

public List<GenericRecord> deserializeGenericWithSchema(byte[] message) throws IOException {
    List<GenericRecord>listOfRecords = new ArrayList<>();
    DatumReader<GenericRecord> reader = new SpecificDatumReader<>();
    DataFileReader<GenericRecord> fileReader =
            new DataFileReader<>(new SeekableByteArrayInput(message),reader);
    while (fileReader.hasNext()) {
        listOfRecords.add(fileReader.next());
    }
    return listOfRecords;
}

PD: I have written the solution in scala and then I have traduced to Java, without testing. Maybe the Java solution is not completely perfect

Countable answered 2/3, 2018 at 13:38 Comment(2)
Hi @fhuertas, could you please look into sort of same issue here #55914901Charybdis
According to the Avro getting started guide GenericDatumReader should be used instead of the SpecificDatumReader to deserialize into GenericRecordRafe
S
0

you have to use morphline to convert json to avro.

Here is link. http://cloudera.github.io/cdk/docs/current/cdk-morphlines/morphlinesReferenceGuide.html#/cdk-morphlines-avro

Simeon answered 29/1, 2016 at 15:49 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.