Avro decoding gives java.io.EOFException
Asked Answered
B

1

9

I use Apache avro schema with Kafka 0.0.8V. I Use same schema at producer/consumer ends. There is NO ANY Changes in the schema. But i get some exception at the consumer, when i try to consume the messages. Why i get this error?

Producer

public void sendFile(String topic, GenericRecord payload, Schema schema) throws CoreException, IOException {
    BinaryEncoder encoder = null;
    ByteArrayOutputStream out = null;
    try {
        DatumWriter<GenericRecord> writer = new SpecificDatumWriter<GenericRecord>(schema);
        out = new ByteArrayOutputStream();
        encoder = EncoderFactory.get().binaryEncoder(out, null);
        writer.write(payload, encoder);
        encoder.flush();

        byte[] serializedBytes = out.toByteArray();

        KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>(topic, serializedBytes);

            producer.send(message);
        }

Consumer

public void run() {
        try {
            ConsumerIterator<byte[], byte[]> itr = stream.iterator();
            while (itr.hasNext()) {

                byte[] data = itr.next().message();

                Schema schema = new Schema.Parser()
                        .parse(new File("/Users/xx/avro_schemas/file.avsc"));

                DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
                Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);

                GenericRecord payload = reader.read(null, decoder);
                System.out.println("Message received --: " + payload);

But I get following exception when the reader try to read message from the decoder.;

java.io.EOFException
    at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
    at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:259)
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
    at com.xx.KafkaMessageListenerThread.run(KafkaMessageListenerThread.java:55)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Consumer properties

enable.auto.commit=true
auto.commit.interval.ms=101
session.timeout.ms=7000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
zookeeper.connect=zookeeper.xx.com\:2181
heartbeat.interval.ms=1000
auto.offset.reset=smallest
serializer.class=kafka.serializer.DefaultEncoder
bootstrap.servers=kafka.xx.com\:9092
group.id=test
consumer.timeout.ms=-1
fetch.min.bytes=1
receive.buffer.bytes=262144
Bonheur answered 18/4, 2016 at 3:56 Comment(0)
P
7

The problem is produced by your AVRO producer.

In the sendFile() method, you are not flushing encoder, and not closing the ByteArrayOutputStream(), causing the EOFException.

Here you have an example of a generic serialization class:

public class TestSerializer<T> {



    final private Class<T> avroType;

    public TestSerializer(Class<T> avroType) {
        this.avroType = avroType;
    }

    public byte[] serialize(T object)
    {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
        DatumWriter<T> writer = new SpecificDatumWriter<T>(avroType);
        try
        {
            writer.write(object, encoder);
            out.close();
        } catch (IOException e)
        {
            throw new RuntimeException(e);
        } finally
        {
            //Here is the flushing and closing
            try
            {
                if (encoder != null)
                {
                    encoder.flush();
                }
                if (out != null)
                {
                    out.close();
                }
            } catch (IOException e)
            {
                throw new RuntimeException(e);
            }
        }

        return out.toByteArray();

    }

}
Plumbic answered 1/2, 2018 at 12:25 Comment(2)
Thanks for the answer, this helped me out as well :)Bugloss
He is flushing the encoder, at encoder.flush(), and closing the ByteArrayOutputStream makes no practical difference.Morice

© 2022 - 2024 — McMap. All rights reserved.