How to read Parquet file from S3 without spark? Java
Asked Answered
F

4

10

Currently, I am using the Apache ParquetReader for reading local parquet files, which looks something like this:

ParquetReader<GenericData.Record> reader = null;
    Path path = new Path("userdata1.parquet");
    try {
        reader = AvroParquetReader.<GenericData.Record>builder(path).withConf(new Configuration()).build();
        GenericData.Record record;
        while ((record = reader.read()) != null) {
            System.out.println(record);

However, I am trying to access a parquet file through S3 without downloading it. Is there a way to parse Inputstream directly with parquet reader?

Flatwise answered 9/4, 2020 at 17:2 Comment(0)
A
6

Yes, the latest versions of hadoop include support for S3 filesystem. Use the s3a client from hadoop-aws library to directly access the S3 filesystem.

The HadoopInputFile Path should be constructed as s3a://bucket-name/prefix/key along with the authentication credentials access_key and secret_key configured using the properties

  • fs.s3a.access.key
  • fs.s3a.secret.key

Additionally, you would require these dependant libraries

  • hadoop-common JAR
  • aws-java-sdk-bundle JAR

Read more: Relevant configuration properties

Aceldama answered 14/4, 2020 at 19:6 Comment(0)
P
3

I got it working with this following dependencies

compile 'org.slf4j:slf4j-api:1.7.5'
compile 'org.slf4j:slf4j-log4j12:1.7.5'
compile 'org.apache.parquet:parquet-avro:1.12.0'
compile 'org.apache.avro:avro:1.10.2'
compile 'com.google.guava:guava:11.0.2'
compile 'org.apache.hadoop:hadoop-client:2.4.0'
compile 'org.apache.hadoop:hadoop-aws:3.3.0'   
compile 'org.apache.hadoop:hadoop-common:3.3.0'      
compile 'com.amazonaws:aws-java-sdk-core:1.11.563'
compile 'com.amazonaws:aws-java-sdk-s3:1.11.563'

Example

Path path = new Path("s3a://yours3path");
Configuration conf = new Configuration();
conf.set("fs.s3a.access.key", "KEY");
conf.set("fs.s3a.secret.key", "SECRET");
conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
conf.setBoolean("fs.s3a.path.style.access", true);
conf.setBoolean(org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED, true);

InputFile file = HadoopInputFile.fromPath(path, conf);
ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(file).build();
GenericRecord record;
while ((record = reader.read()) != null) {
  System.out.println(record);
}
Per answered 8/4, 2021 at 3:6 Comment(1)
you should use aws-java-sdk-bundle rather than the sdk stuff; avoids jackson, httpclient classpath issues. Also there's a bit of code in hadoop 3.3.0 which only links to the shaded JAR (fixed in forthcoming releases)Cheltenham
E
1

Just adding on top of @franklinsijo , for freshers starting S3, Please note that access key and secret key is set for Hadoop Configuration: Here is a snippet of code that might be useful:

public static void main(String[] args) throws IOException {
String PATH_SCHEMA = "s3a://xxx/xxxx/userdata1.parquet";
Path path = new Path(PATH_SCHEMA);
Configuration conf = new Configuration();
conf.set("fs.s3a.access.key", "xxxxx");
conf.set("fs.s3a.secret.key", "xxxxx");
InputFile file = HadoopInputFile.fromPath(path, conf);
ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(file).build();
GenericRecord record;
while ((record = reader.read()) != null) {
  System.out.println(record.toString());
}

My imports:

import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.InputFile;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
Expenditure answered 24/2, 2021 at 11:57 Comment(0)
J
0

My use-case was to copy-paste the parquet file from one S3 location of AWS account A to another S3 location of AWS account B, without using spark. I used below snippet to perform the same.

The key takeaway for me was to use a Bytes array as an intermediate data structure.

Imports:

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;

Code snippet:

String inputBucket = "XXXX";
String outputBucket = "XXXX";
String s3Prefix = "XXXX";

String awsAccessKey = "XXXXX";
String awsSecretKey = "XXXXX";

AWSCredentials credentials = new BasicAWSCredentials(awsAccessKey, awsSecretKey);
AmazonS3 s3client = new AmazonS3Client(credentials);

FileSystem fileSystem = FileSystem.get(URI.create("s3a:" + outputBucket), new JobConf(MyClass.class));
FSDataOutputStream fsDataOutputStream = null;
int count = 0;

try {
    ObjectListing objectListing = s3client.listObjects(new ListObjectsRequest().withBucketName(inputBucket)
            .withPrefix(s3Prefix));
    for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {

    String fileName = objectSummary.getKey().substring(objectSummary.getKey().lastIndexOf(DELIMITER) + 1);
    String outputPath = "s3a:" + outputBucket + s3Prefix + "/" + fileName;
    fsDataOutputStream = fileSystem.create(new Path(outputPath));

    InputStream inputStream = s3client.getObject(new GetObjectRequest(inputBucket, objectSummary.getKey())).getObjectContent();
    byte[] buffer = new byte[4096];
    int bytesRead = inputStream.read(buffer);
    while (bytesRead != -1) {
        fsDataOutputStream.write(buffer, 0, bytesRead);
        bytesRead = inputStream.read(buffer);
    }
    inputStream.close();
    fsDataOutputStream.close();

    count += 1;
    log.info("Successfully downloaded file to temporary path: {}", outputPath);
}
} catch (IOException e) {
    e.printStackTrace();
} finally {
    try {
        if (fsDataOutputStream != null) {
            fsDataOutputStream.close();
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}
Jauregui answered 28/6, 2024 at 13:24 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.