Read parquet data from ByteArrayOutputStream instead of file
Asked Answered
K

2

13

I would like to convert this code:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.RecordReader;
import org.apache.parquet.schema.MessageType;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;


public class ParquetReaderUtils {

    public static Parquet getParquetData(String filePath) throws IOException {
        List<SimpleGroup> simpleGroups = new ArrayList<>();
        ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(filePath), new Configuration()));
        MessageType schema = reader.getFooter().getFileMetaData().getSchema();
        //List<Type> fields = schema.getFields();
        PageReadStore pages;
        while ((pages = reader.readNextRowGroup()) != null) {
            long rows = pages.getRowCount();
            MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
            RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));

            for (int i = 0; i < rows; i++) {
                SimpleGroup simpleGroup = (SimpleGroup) recordReader.read();
                simpleGroups.add(simpleGroup);
            }
        }
        reader.close();
        return new Parquet(simpleGroups, schema);
    }
}

(which is from https://www.arm64.ca/post/reading-parquet-files-java/)

to take a ByteArrayOutputStream parameter instead of a filePath.

Is this possible? I don't see a ParquetStreamReader in org.apache.parquet.hadoop.

Any help is appreciated. I am trying to write a test app for parquet coming from kafka and writing each of many messages out to a file is rather slow.

Karlotta answered 27/9, 2019 at 20:56 Comment(3)
Just implement the org.apache.parquet.io.InputFile interface, as the org.apache.parquet.hadoop.util.HadoopInputFile does. The only thing you have to do is to make a bytearray out of your outputstream, make a bytearrayinputstream out of it and pass it to org.apache.parquet.io.DelegatingSeekableInputStreamElnora
Thanks for the tip, @m4gic. I will try this once the bounty expires.Karlotta
Try ParquetReader<GenericRecord> parquetReader = AvroParquetReader .<GenericRecord>builder(new ParquetByteArrayInputFile(parquetFile.getBytes())).build();. You can also see my answer below.Scyphozoan
E
16

So without deeper testing, I would try with this class (albeit the content of the outputstream should be parquet-compatible). I put there a streamId to make the identification of the processed bytearray easier (the ParquetFileReader prints the instance.toString() out if something went wrong).

public class ParquetStream implements InputFile {
    private final String streamId;
    private final byte[] data;

    private static class SeekableByteArrayInputStream extends ByteArrayInputStream {
        public SeekableByteArrayInputStream(byte[] buf) {
            super(buf);
        }

        public void setPos(int pos) {
            this.pos = pos;
        }

        public int getPos() {
            return this.pos;
        }
    }

    public ParquetStream(String streamId, ByteArrayOutputStream stream) {
        this.streamId = streamId;
        this.data = stream.toByteArray();
    }

    @Override
    public long getLength() throws IOException {
        return this.data.length;
    }

    @Override
    public SeekableInputStream newStream() throws IOException {
        return new DelegatingSeekableInputStream(new SeekableByteArrayInputStream(this.data)) {
            @Override
            public void seek(long newPos) throws IOException {
                ((SeekableByteArrayInputStream) this.getStream()).setPos((int) newPos);
            }

            @Override
            public long getPos() throws IOException {
                return ((SeekableByteArrayInputStream) this.getStream()).getPos();
            }
        };
    }

    @Override
    public String toString() {
        return "ParquetStream[" + streamId + "]";
    }
}
Elnora answered 6/10, 2019 at 21:46 Comment(2)
I'm successfully using this approach. For my own implementation, I also created a ByteArraySeekableInputStream class which stores the SeekableByteArrayInputStream delegate in a field, rather than declaring an anonymous inner class. This avoids the ((SeekableByteArrayInputStream) this.getStream()) cast.Elbow
Very nice alternative to huge hadoop-aws dependency, thanks. +1Hereto
S
0

May be a bit of an old question, but I had the same issue as reading from MultipartFile as a part of an API. I am using this to validate some data. Below is my code without writing any new classes.

<dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-avro</artifactId>
        <version>1.12.0</version>
</dependency>
<dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>3.3.1</version>
</dependency>

ParquetReader<GenericRecord> parquetReader = AvroParquetReader
            .<GenericRecord>builder(new ParquetByteArrayInputFile(parquetFile.getBytes())).build();
Scyphozoan answered 20/12, 2023 at 17:44 Comment(1)
You mentioned ParquetByteArrayInputFile, this must be a custom implementation, or am I missing something?Aaren

© 2022 - 2024 — McMap. All rights reserved.