Q: Converting Avro to Parquet in Memory
Asked Answered
S

2

6

I am receiving Avro records from Kafka. I want to convert these records into Parquet files. I am following this blog post: http://blog.cloudera.com/blog/2014/05/how-to-convert-existing-data-into-parquet/

The code so far looks roughly like this:

final String fileName
SinkRecord record, 
final AvroData avroData

final Schema avroSchema = avroData.fromConnectSchema(record.valueSchema());
CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;

int blockSize = 256 * 1024 * 1024;
int pageSize = 64 * 1024;

Path path = new Path(fileName);
writer = new AvroParquetWriter<>(path, avroSchema, compressionCodecName, blockSize, pageSize);

Now, this will do the Avro to Parquet conversion, but it will write the Parquet file to the disk. I was wondering if there was an easier way to just keep the file in memory so that I don't have to manage temp files on the disk. Thank you

Shop answered 22/9, 2016 at 6:31 Comment(0)
F
3

Please check my blog, https://yanbin.blog/convert-apache-avro-to-parquet-format-in-java/ translate into English if necessary

package yanbin.blog;
 
import org.apache.parquet.io.DelegatingPositionOutputStream;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.PositionOutputStream;
 
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
 
public class InMemoryOutputFile implements OutputFile {
    private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
 
    @Override
    public PositionOutputStream create(long blockSizeHint) throws IOException { // Mode.CREATE calls this method
        return new InMemoryPositionOutputStream(baos);
    }
 
    @Override
    public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
        return null;
    }
 
    @Override
    public boolean supportsBlockSize() {
        return false;
    }
 
    @Override
    public long defaultBlockSize() {
        return 0;
    }
 
    public byte[] toArray() {
        return baos.toByteArray();
    }
 
    private static class InMemoryPositionOutputStream extends DelegatingPositionOutputStream {
 
        public InMemoryPositionOutputStream(OutputStream outputStream) {
            super(outputStream);
        }
 
        @Override
        public long getPos() throws IOException {
            return ((ByteArrayOutputStream) this.getStream()).size();
        }
    }
}
    public static <T extends SpecificRecordBase> void writeToParquet(List<T> avroObjects) throws IOException {
        Schema avroSchema = avroObjects.get(0).getSchema();
        GenericData genericData = GenericData.get();
        genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
        InMemoryOutputFile outputFile = new InMemoryOutputFile();
        try (ParquetWriter<Object> writer = AvroParquetWriter.builder(outputFile)
                .withDataModel(genericData)
                .withSchema(avroSchema)
                .withCompressionCodec(CompressionCodecName.SNAPPY)
                .withWriteMode(ParquetFileWriter.Mode.CREATE)
                .build()) {
            avroObjects.forEach(r -> {
                try {
                    writer.write(r);
                } catch (IOException ex) {
                    throw new UncheckedIOException(ex);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
 
        // dump memory data to file for testing
        Files.write(Paths.get("./users-memory.parquet"), outputFile.toArray());
    }

Test data from memory

$ parquet-tools cat --json users-memory.parquet
$ parquet-tools schema users-memory.parquet
Foolish answered 24/2, 2021 at 5:48 Comment(0)
D
0
"but it will write the Parquet file to the disk"
"if there was an easier way to just keep the file in memory"

From your queries I understood that you don't want to write the partial files to parquet. If you want the complete file to be written to disk in parquet format and temp files in memory you can use a combination of Memory Mapped File and parquet format.

Write your data to a memory mapped file, once done with the writes convert the bytes to parquet format and store to disk.

Have a look at MappedByteBuffer.

Dominiquedominium answered 5/10, 2016 at 9:50 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.