In Java, how can I create an equivalent of an Apache Avro container file without being forced to use a File as a medium?
Asked Answered
G

3

21

This is somewhat of a shot in the dark in case anyone savvy with the Java implementation of Apache Avro is reading this.

My high-level objective is to have some way to transmit some series of avro data over the network (let's just say HTTP for example, but the particular protocol is not that important for this purpose). In my context I have a HttpServletResponse I need to write this data to somehow.

I initially attempted to write the data as what amounted to a virtual version of an avro container file (suppose that "response" is of type HttpServletResponse):

response.setContentType("application/octet-stream");
response.setHeader("Content-transfer-encoding", "binary");
ServletOutputStream outStream = response.getOutputStream();
BufferedOutputStream bos = new BufferedOutputStream(outStream);

Schema someSchema = Schema.parse(".....some valid avro schema....");
GenericRecord someRecord = new GenericData.Record(someSchema);
someRecord.put("somefield", someData);
...

GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(someSchema);
DataFileWriter<GenericRecord> fileWriter = new DataFileWriter<GenericRecord>(datumWriter);
fileWriter.create(someSchema, bos);
fileWriter.append(someRecord);
fileWriter.close();
bos.flush();

This was all fine and dandy, except that it turns out Avro doesn't really provide a way to read a container file apart from an actual file: the DataFileReader only has two constructors:

public DataFileReader(File file, DatumReader<D> reader);

and

public DataFileReader(SeekableInput sin, DatumReader<D> reader);

where SeekableInput is some avro-specific customized form whose creation also ends up reading from a file. Now given that, unless there is some way to somehow coerce an InputStream into a File (https://mcmap.net/q/412148/-create-a-java-file-object-or-equivalent-using-a-byte-array-in-memory-without-a-physical-file suggests that there is not, and I have tried looking around the Java documentation as well), this approach won't work if the reader on the other end of the OutputStream receives that avro container file (I'm not sure why they allowed one to output avro binary container files to an arbitrary OutputStream without providing a way to read them from the corresponding InputStream on the other end, but that's beside the point). It seems that the implementation of the container file reader requires the "seekable" functionality that a concrete File provides.

Okay, so it doesn't look like that approach will do what I want. How about creating a JSON response that mimics the avro container file?

public static Schema WRAPPER_SCHEMA = Schema.parse(
  "{\"type\": \"record\", " +
   "\"name\": \"AvroContainer\", " +
   "\"doc\": \"a JSON avro container file\", " +
   "\"namespace\": \"org.bar.foo\", " +
   "\"fields\": [" +
     "{\"name\": \"schema\", \"type\": \"string\", \"doc\": \"schema representing the included data\"}, " +
     "{\"name\": \"data\", \"type\": \"bytes\", \"doc\": \"packet of data represented by the schema\"}]}"
  );

I'm not sure if this is the best way to approach this given the above constraints, but it looks like this might do the trick. I'll put the schema (of "Schema someSchema" from above, for instance) as a String inside the "schema" field, and then put in the avro-binary-serialized form of a record fitting that schema (ie. "GenericRecord someRecord") inside the "data" field.

I actually wanted to know about a specific detail of that which is described below, but I thought it would be worthwhile to give a bigger context as well, so that if there is a better high-level approach I could be taking (this approach works but just doesn't feel optimal) please do let me know.

My question is, assuming I go with this JSON-based approach, how do I write the avro binary representation of my Record into the "data" field of the AvroContainer schema? For example, I got up to here:

ByteArrayOutputStream baos = new ByteArrayOutputStream();
GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(someSchema);
Encoder e = new BinaryEncoder(baos);
datumWriter.write(resultsRecord, e);
e.flush();

GenericRecord someRecord = new GenericData.Record(someSchema);
someRecord.put("schema", someSchema.toString());
someRecord.put("data", ByteBuffer.wrap(baos.toByteArray()));
datumWriter = new GenericDatumWriter<GenericRecord>(WRAPPER_SCHEMA);
JsonGenerator jsonGenerator = new JsonFactory().createJsonGenerator(baos, JsonEncoding.UTF8);
e = new JsonEncoder(WRAPPER_SCHEMA, jsonGenerator);
datumWriter.write(someRecord, e);
e.flush();

PrintWriter printWriter = response.getWriter(); // recall that response is the HttpServletResponse
response.setContentType("text/plain");
response.setCharacterEncoding("UTF-8");
printWriter.print(baos.toString("UTF-8"));

I initially tried omitting the ByteBuffer.wrap clause, but then then the line

datumWriter.write(someRecord, e);

threw an exception that I couldn't cast a byte array into ByteBuffer. Fair enough, it looks like when the Encoder class (of which JsonEncoder is a subclass) is called to write an avro Bytes object, it requires a ByteBuffer to be given as an argument. Thus, I tried encapsulating the byte[] with java.nio.ByteBuffer.wrap, but when the data was printed out, it was printed as a straight series of bytes, without being passed through the avro hexadecimal representation:

"data": {"bytes": ".....some gibberish other than the expected format...}

That doesn't seem right. According to the avro documentation, the example bytes object they give says that I need to put in a json object, an example of which looks like "\u00FF", and what I have put in there is clearly not of that format. What I now want to know is the following:

  • What is an example of an avro bytes format? Does it look something like "\uDEADBEEFDEADBEEF..."?
  • How do I coerce my binary avro data (as output by the BinaryEncoder into a byte[] array) into a format that I can stick into the GenericRecord object and have it print correctly in JSON? For example, I want an Object DATA for which I can call on some GenericRecord "someRecord.put("data", DATA);" with my avro serialized data inside?
  • How would I then read that data back into a byte array on the other (consumer) end, when it is given the text JSON representation and wants to recreate the GenericRecord as represented by the AvroContainer-format JSON?
  • (reiterating the question from before) Is there a better way I could be doing all this?
Gabbi answered 24/9, 2011 at 8:42 Comment(5)
org.apache.avro.file.DataFileStream ?Lelea
SeekableInput is not just some avro-specific customized form whose creation ends up reading from a file. There is SeekableByteArrayInput which reads from a byte array in memory.Kalgan
Very good question -- and the requirement to need random access is very strange, since it is impossible to satisfy without possibly huge buffer. And yet it seems unnecessary to do as well... I don't know why it was felt random access is needed. Many other data formats do not add such requirements for processing.Straightaway
(Just happened across this.) I don't understand precisely what you're trying to do--if you're just transmitting an Avro message (like in a message queue) then the normal writing-to-a-byte-buffer if what you want: the schema is sent, the data is sent, it can all be recovered. What am I missing about your question?Culex
Dave - it's the "I want to send thousands of the same record" problem - sending an Avro Message for each record would mean sending the schema for every record. The Container File is a specified methodology for sending the schema once, followed by a bunch of records. Of course, you can do this yourself (as mentioned in one of the answers) - but why not follow a specification outlined by Avro if it's available?Islet
A
2

As Knut said, if you want to use something other than a file, you can either:

  • use SeekableByteArrayInput, as Knut said, for anything you can shoe-horn into a byte array
  • Implement SeekablInput in your own way - for example if you were getting it out of some weird database structure.
  • Or just use a file. Why not?

Those are your answers.

Amatory answered 20/6, 2012 at 9:23 Comment(1)
Also, using a file increases the overhead for disk I/O so if you're receiving a byte array through the network you don't want to put it in a file first and then read it (disk I/O round trip!!!).Patricepatrich
I
1

Under Java and Scala, we tried using inception via code generated using the Scala nitro codegen. Inception is how the Javascript mtth/avsc library solved this problem. However, we ran into several serialization problems using the Java library where there were erroneous bytes being injected into the byte stream, consistently - and we could not figure out where those bytes were coming from.

Of course that meant building our own implementation of Varint with ZigZag encoding. Meh.

Here it is:

package com.terradatum.query

import java.io.ByteArrayOutputStream
import java.nio.ByteBuffer
import java.security.MessageDigest
import java.util.UUID

import akka.actor.ActorSystem
import akka.stream.stage._
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import com.nitro.scalaAvro.runtime.GeneratedMessage
import com.terradatum.diagnostics.AkkaLogging
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}
import org.apache.avro.io.EncoderFactory
import org.elasticsearch.search.SearchHit

import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

/*
* The original implementation of this helper relied exclusively on using the Header Avro record and inception to create
* the header. That didn't work for us because somehow erroneous bytes were injected into the output.
*
* Specifically:
* 1. 0x08 prepended to the magic
* 2. 0x0020 between the header and the sync marker
*
* Rather than continue to spend a large number of hours trying to troubleshoot why the Avro library was producing such
* erroneous output, we build the Avro Container File using a combination of our own code and Avro library code.
*
* This means that Terradatum code is responsible for the Avro Container File header (including magic, file metadata and
* sync marker) and building the blocks. We only use the Avro library code to build the binary encoding of the Avro
* records.
*
* @see https://avro.apache.org/docs/1.8.1/spec.html#Object+Container+Files
*/
object AvroContainerFileHelpers {

  val magic: ByteBuffer = {
    val magicBytes = "Obj".getBytes ++ Array[Byte](1.toByte)
    val mg = ByteBuffer.allocate(magicBytes.length).put(magicBytes)
    mg.position(0)
    mg
  }

  def makeSyncMarker(): Array[Byte] = {
    val digester = MessageDigest.getInstance("MD5")
    digester.update(s"${UUID.randomUUID}@${System.currentTimeMillis()}".getBytes)
    val marker = ByteBuffer.allocate(16).put(digester.digest()).compact()
    marker.position(0)
    marker.array()
  }

  /*
  * Note that other implementations of avro container files, such as the javascript library
  * mtth/avsc uses "inception" to encode the header, that is, a datum following a header
  * schema should produce valid headers. We originally had attempted to do the same but for
  * an unknown reason two bytes wore being inserted into our header, one at the very beginning
  * of the header before the MAGIC marker, and one right before the syncmarker of the header.
  * We were unable to determine why this wasn't working, and so this solution was used instead
  * where the record/map is encoded per the avro spec manually without the use of "inception."
  */
  def header(schema: Schema, syncMarker: Array[Byte]): Array[Byte] = {
    def avroMap(map: Map[String, ByteBuffer]): Array[Byte] = {
      val mapBytes = map.flatMap {
        case (k, vBuff) =>
          val v = vBuff.array()
          val byteStr = k.getBytes()
          Varint.encodeLong(byteStr.length) ++ byteStr ++ Varint.encodeLong(v.length) ++ v
      }
      Varint.encodeLong(map.size.toLong) ++ mapBytes ++ Varint.encodeLong(0)
    }

    val schemaBytes = schema.toString.getBytes
    val schemaBuffer = ByteBuffer.allocate(schemaBytes.length).put(schemaBytes)
    schemaBuffer.position(0)
    val metadata = Map("avro.schema" -> schemaBuffer)
    magic.array() ++ avroMap(metadata) ++ syncMarker
  }

  def block(binaryRecords: Seq[Array[Byte]], syncMarker: Array[Byte]): Array[Byte] = {
    val countBytes = Varint.encodeLong(binaryRecords.length.toLong)
    val sizeBytes = Varint.encodeLong(binaryRecords.foldLeft(0)(_+_.length).toLong)

    val buff: ArrayBuffer[Byte] = new scala.collection.mutable.ArrayBuffer[Byte]()

    buff.append(countBytes:_*)
    buff.append(sizeBytes:_*)
    binaryRecords.foreach { rec =>
      buff.append(rec:_*)
    }
    buff.append(syncMarker:_*)

    buff.toArray
  }

  def encodeBlock[T](schema: Schema, records: Seq[GenericRecord], syncMarker: Array[Byte]): Array[Byte] = {
    //block(records.map(encodeRecord(schema, _)), syncMarker)
    val writer = new GenericDatumWriter[GenericRecord](schema)
    val out = new ByteArrayOutputStream()
    val binaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
    records.foreach(record => writer.write(record, binaryEncoder))
    binaryEncoder.flush()
    val flattenedRecords = out.toByteArray
    out.close()

    val buff: ArrayBuffer[Byte] = new scala.collection.mutable.ArrayBuffer[Byte]()

    val countBytes = Varint.encodeLong(records.length.toLong)
    val sizeBytes = Varint.encodeLong(flattenedRecords.length.toLong)

    buff.append(countBytes:_*)
    buff.append(sizeBytes:_*)
    buff.append(flattenedRecords:_*)
    buff.append(syncMarker:_*)

    buff.toArray
  }

  def encodeRecord[R <: GeneratedMessage with com.nitro.scalaAvro.runtime.Message[R]: ClassTag](
      entity: R
  ): Array[Byte] =
    encodeRecord(entity.companion.schema, entity.toMutable)

  def encodeRecord(schema: Schema, record: GenericRecord): Array[Byte] = {
    val writer = new GenericDatumWriter[GenericRecord](schema)
    val out = new ByteArrayOutputStream()
    val binaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
    writer.write(record, binaryEncoder)
    binaryEncoder.flush()
    val bytes = out.toByteArray
    out.close()
    bytes
  }
}

/**
  * Encoding of integers with variable-length encoding.
  *
  * The avro specification uses a variable length encoding for integers and longs.
  * If the most significant bit in a integer or long byte is 0 then it knows that no
  * more bytes are needed, if the most significant bit is 1 then it knows that at least one
  * more byte is needed. In signed ints and longs the most significant bit is traditionally
  * used to represent the sign of the integer or long, but for us it's used to encode whether
  * more bytes are needed. To get around this limitation we zig-zag through whole numbers such that
  * negatives are odd numbers and positives are even numbers:
  *
  * i.e. -1, -2, -3 would be encoded as 1, 3, 5, and so on
  * while 1,  2,  3 would be encoded as 2, 4, 6, and so on.
  *
  * More information is available in the avro specification here:
  * @see http://lucene.apache.org/core/3_5_0/fileformats.html#VInt
  *      https://developers.google.com/protocol-buffers/docs/encoding?csw=1#types
  */
object Varint {

  import scala.collection.mutable

  def encodeLong(longVal: Long): Array[Byte] = {
    val buff = new ArrayBuffer[Byte]()
    Varint.zigZagSignedLong(longVal, buff)
    buff.toArray[Byte]
  }

  def encodeInt(intVal: Int): Array[Byte] = {
    val buff = new ArrayBuffer[Byte]()
    Varint.zigZagSignedInt(intVal, buff)
    buff.toArray[Byte]
  }

  def zigZagSignedLong[T <: mutable.Buffer[Byte]](x: Long, dest: T): Unit = {
    // sign to even/odd mapping: http://code.google.com/apis/protocolbuffers/docs/encoding.html#types
    writeUnsignedLong((x << 1) ^ (x >> 63), dest)
  }

  def writeUnsignedLong[T <: mutable.Buffer[Byte]](v: Long, dest: T): Unit = {
    var x = v
    while ((x & 0xFFFFFFFFFFFFFF80L) != 0L) {
      dest += ((x & 0x7F) | 0x80).toByte
      x >>>= 7
    }
    dest += (x & 0x7F).toByte
  }

  def zigZagSignedInt[T <: mutable.Buffer[Byte]](x: Int, dest: T): Unit = {
    writeUnsignedInt((x << 1) ^ (x >> 31), dest)
  }

  def writeUnsignedInt[T <: mutable.Buffer[Byte]](v: Int, dest: T): Unit = {
    var x = v
    while ((x & 0xFFFFF80) != 0L) {
      dest += ((x & 0x7F) | 0x80).toByte
      x >>>= 7
    }
    dest += (x & 0x7F).toByte
  }
}
Islet answered 16/4, 2017 at 17:1 Comment(0)
B
0

The way I solved this was to ship the schemas separately from the data. I set up a connection handshake that transmits the schemas down from the server, then I send encoded data back and forth. You have to create an outside wrapper object like this:

{'name':'Wrapper','type':'record','fields':[
  {'name':'schemaName','type':'string'},
  {'name':'records','type':{'type':'array','items':'bytes'}}
]}

Where you first encode your array of records, one by one, into an array of encoded byte arrays. Everything in one array should have the same schema. Then you encode the wrapper object with the above schema -- set "schemaName" to be the name of the schema you used to encode the array.

On the server, you will decode the wrapper object first. Once you decode the wrapper object, you know the schemaName, and you have an array of objects you know how to decode -- use as you will!

Note that you can get away without using the wrapper object if you use a protocol like WebSockets and an engine like Socket.IO (for Node.js) Socket.io gives you a channel-based communication layer between browser and server. In that case, just use a specific schema for each channel, encode each message before you send it. You still have to share the schemas when the connection initiates -- but if you are using WebSockets this is easy to implement. And when you are done you have an arbitrary number of strongly-typed, bidirectional streams between client and server.

Bujumbura answered 8/2, 2016 at 16:17 Comment(1)
While not a bad solution, it doesn't even come close to addressing the OP's stated question.Islet

© 2022 - 2024 — McMap. All rights reserved.