How to read file chunk by chunk from S3 using aws-java-sdk
Asked Answered
F

6

10

I am trying to read large file into chunks from S3 without cutting any line for parallel processing.

Let me explain by example: There is file of size 1G on S3. I want to divide this file into chucks of 64 MB. It is easy I can do it like :

S3Object s3object = s3.getObject(new GetObjectRequest(bucketName, key));

InputStream stream = s3object.getObjectContent();

byte[] content = new byte[64*1024*1024];

while (stream.read(content)  != -1) {

//process content here 

}

but problem with chunk is it may have 100 complete line and one incomplete. but I can not process incomplete line and don't want to discard it.

Is any way to handle this situations ? means all chucks have no partial line.

Ferdinana answered 6/6, 2017 at 11:46 Comment(0)
T
10

My usual approach (InputStream -> BufferedReader.lines() -> batches of lines -> CompletableFuture) won't work here because the underlying S3ObjectInputStream times out eventually for huge files.

So I created a new class S3InputStream, which doesn't care how long it's open for and reads byte blocks on demand using short-lived AWS SDK calls. You provide a byte[] that will be reused. new byte[1 << 24] (16Mb) appears to work well.

package org.harrison;

import java.io.IOException;
import java.io.InputStream;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.GetObjectRequest;

/**
 * An {@link InputStream} for S3 files that does not care how big the file is.
 *
 * @author stephen harrison
 */
public class S3InputStream extends InputStream {
    private static class LazyHolder {
        private static final AmazonS3 S3 = AmazonS3ClientBuilder.defaultClient();
    }

    private final String bucket;
    private final String file;
    private final byte[] buffer;
    private long lastByteOffset;

    private long offset = 0;
    private int next = 0;
    private int length = 0;

    public S3InputStream(final String bucket, final String file, final byte[] buffer) {
        this.bucket = bucket;
        this.file = file;
        this.buffer = buffer;
        this.lastByteOffset = LazyHolder.S3.getObjectMetadata(bucket, file).getContentLength() - 1;
    }

    @Override
    public int read() throws IOException {
        if (next >= length) {
            fill();

            if (length <= 0) {
                return -1;
            }

            next = 0;
        }

        if (next >= length) {
            return -1;
        }

        return buffer[this.next++];
    }

    public void fill() throws IOException {
        if (offset >= lastByteOffset) {
            length = -1;
        } else {
            try (final InputStream inputStream = s3Object()) {
                length = 0;
                int b;

                while ((b = inputStream.read()) != -1) {
                    buffer[length++] = (byte) b;
                }

                if (length > 0) {
                    offset += length;
                }
            }
        }
    }

    private InputStream s3Object() {
        final GetObjectRequest request = new GetObjectRequest(bucket, file).withRange(offset,
                offset + buffer.length - 1);

        return LazyHolder.S3.getObject(request).getObjectContent();
    }
}
Task answered 19/11, 2017 at 19:21 Comment(1)
Thanks for this. I find this InputStream much more reliable than the one you get from the sdk getObject method. I updated it for v2 of the sdk -- see my new answerMauramauralia
P
3

The aws-java-sdk already provides streaming functionality for your S3 objects. You have to call "getObject" and the result will be an InputStream.

1) AmazonS3Client.getObject(GetObjectRequest getObjectRequest) -> S3Object

2) S3Object.getObjectContent()

Note: The method is a simple getter and does not actually create a stream. If you retrieve an S3Object, you should close this input stream as soon as possible, because the object contents aren't buffered in memory and stream directly from Amazon S3. Further, failure to close this stream can cause the request pool to become blocked.

aws java docs

Polis answered 6/6, 2017 at 12:4 Comment(0)
C
3

100 complete line and one incomplete

do you mean you need to read the stream line by line? If so, instead of using a an InputStream try to read the s3 object stream by using BufferedReader so that you can read the stream line by line but I think this will make a little slower than by chunk.

        S3Object s3object = s3.getObject(new GetObjectRequest(bucketName, key));
        BufferedReader in = new BufferedReader(new InputStreamReader(s3object.getObjectContent()));
        String line;
        while ((line = in.readLine()) != null)  {

//process line here

        }
Cockswain answered 7/7, 2017 at 1:24 Comment(0)
P
1

You can read all the files in the bucket with checking the tokens. And you can read files with other java libs.. i.e. Pdf.

import java.io.IOException;
import java.io.InputStream;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import javax.swing.JTextArea;
import java.io.FileWriter;
import org.apache.pdfbox.pdmodel.PDDocument;
import org.apache.pdfbox.text.PDFTextStripper;
import org.apache.pdfbox.text.PDFTextStripperByArea;
import org.joda.time.DateTime;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import java.io.File; 
   //..
   // in your main class 
   private static AWSCredentials credentials = null;
   private static AmazonS3 amazonS3Client = null;

   public static void intializeAmazonObjects() {
        credentials = new BasicAWSCredentials(ACCESS_KEY, SECRET_ACCESS_KEY);
        amazonS3Client = new AmazonS3Client(credentials);
    }
   public void mainMethod() throws IOException, AmazonS3Exception{
        // connect to aws
        intializeAmazonObjects();

    ListObjectsV2Request req = new ListObjectsV2Request().withBucketName(bucketName);
    ListObjectsV2Result listObjectsResult;
do {

        listObjectsResult = amazonS3Client.listObjectsV2(req);
        int count = 0;
        for (S3ObjectSummary objectSummary : listObjectsResult.getObjectSummaries()) {
            System.out.printf(" - %s (size: %d)\n", objectSummary.getKey(), objectSummary.getSize());

            // Date lastModifiedDate = objectSummary.getLastModified();

            // String bucket = objectSummary.getBucketName();
            String key = objectSummary.getKey();
            String newKey = "";
            String newBucket = "";
            String resultText = "";

            // only try to read pdf files
            if (!key.contains(".pdf")) {
                continue;
            }

            // Read the source file as text
            String pdfFileInText = readAwsFile(objectSummary.getBucketName(), objectSummary.getKey());
            if (pdfFileInText.isEmpty())
                continue;
        }//end of current bulk

        // If there are more than maxKeys(in this case 999 default) keys in the bucket,
        // get a continuation token
        // and list the next objects.
        String token = listObjectsResult.getNextContinuationToken();
        System.out.println("Next Continuation Token: " + token);
        req.setContinuationToken(token);
    } while (listObjectsResult.isTruncated());
}

public String readAwsFile(String bucketName, String keyName) {
    S3Object object;
    String pdfFileInText = "";
    try {

        // AmazonS3 s3client = getAmazonS3ClientObject();
        object = amazonS3Client.getObject(new GetObjectRequest(bucketName, keyName));
        InputStream objectData = object.getObjectContent();

        PDDocument document = PDDocument.load(objectData);
        document.getClass();

        if (!document.isEncrypted()) {

            PDFTextStripperByArea stripper = new PDFTextStripperByArea();
            stripper.setSortByPosition(true);

            PDFTextStripper tStripper = new PDFTextStripper();

            pdfFileInText = tStripper.getText(document);

        }

    } catch (Exception e) {
        e.printStackTrace();
    }
    return pdfFileInText;
}
Perot answered 20/9, 2018 at 9:20 Comment(0)
M
0

The @stephen-harrison answer works well. I updated it for v2 of the sdk. I made a couple of tweaks: mainly the connection can now be authorized and the LazyHolder class is no longer static -- I couldn't figure out how to authorize the connection and still keep the class static.

For another approach using Scala, see https://alexwlchan.net/2019/09/streaming-large-s3-objects/

    package foo.whatever;

    import java.io.IOException;
    import java.io.InputStream;

     import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
     import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
     import software.amazon.awssdk.regions.Region;
     import software.amazon.awssdk.services.s3.S3Client;
     import software.amazon.awssdk.services.s3.model.GetObjectRequest;
     import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
     import software.amazon.awssdk.services.s3.model.HeadObjectResponse;

    /**
     * Adapted for aws Java sdk v2 by [email protected]
     * 
     * An {@link InputStream} for S3 files that does not care how big the file   is.
     *
     * @author stephen harrison
     */
   public class S3InputStreamV2 extends InputStream {
       private class LazyHolder {
           String appID;
           String secretKey;
           Region region = Region.US_WEST_1;
           public S3Client S3 = null;

           public void connect() {
               AwsBasicCredentials awsCreds = AwsBasicCredentials.create(appID, secretKey);
               S3 =  S3Client.builder().region(region).credentialsProvider(StaticCredentialsProvider.create(awsCreds))
                    .build();
           }

        private HeadObjectResponse getHead(String keyName, String bucketName) {
            HeadObjectRequest objectRequest = HeadObjectRequest.builder().key(keyName).bucket(bucketName).build();

            HeadObjectResponse objectHead = S3.headObject(objectRequest);
            return objectHead;
        }

        // public static final AmazonS3 S3 = AmazonS3ClientBuilder.defaultClient();

    }

    private LazyHolder lazyHolder = new LazyHolder();

    private final String bucket;
    private final String file;
    private final byte[] buffer;
    private long lastByteOffset;

    private long offset = 0;
    private int next = 0;
    private int length = 0;

    public S3InputStreamV2(final String bucket, final String file, final byte[] buffer, String appID, String secret) {
        this.bucket = bucket;
        this.file = file;
        this.buffer = buffer;
        lazyHolder.appID = appID;
        lazyHolder.secretKey = secret;
        lazyHolder.connect();
        this.lastByteOffset = lazyHolder.getHead(file, bucket).contentLength();
    }

    @Override
    public int read() throws IOException {
        if (next >= length || (next == buffer.length && length == buffer.length)) {
            fill();

            if (length <= 0) {
                return -1;
            }

            next = 0;
        }

        if (next >= length) {
            return -1;
        }

        return buffer[this.next++] & 0xFF;
    }

    public void fill() throws IOException {
        if (offset >= lastByteOffset) {
            length = -1;
        } else {
            try (final InputStream inputStream = s3Object()) {
                length = 0;
                int b;

                while ((b = inputStream.read()) != -1) {
                    buffer[length++] = (byte) b;
                }

                if (length > 0) {
                    offset += length;
                }
            }
        }
    }

    private InputStream s3Object() {
        final Long rangeEnd = offset + buffer.length - 1;
        final String rangeString = "bytes=" + offset + "-" + rangeEnd;
        final GetObjectRequest getObjectRequest = GetObjectRequest.builder().bucket(bucket).key(file).range(rangeString)
                .build();

        return lazyHolder.S3.getObject(getObjectRequest);
    }
}
Mauramauralia answered 17/11, 2021 at 14:5 Comment(2)
One note: this class works very reliably. I have used it with byte[] from .5 to 5Mb. Larger buffer sizes are faster, but obviously take more memory. Be aware that the smaller your buffer the more connections you will be making to S3, which will incur more costs.Mauramauralia
these amendments are wrong. E.g. (next == buffer.length && length == buffer.length) always false, and buffer[this.next++] & 0xFF throw an Exception in the end. The V1 works fineBarrault
P
0

Got puzzled while we were migrating from AWS Sdk V1 to V2 and realised in V2 SDK its not the same way to define the range

With AWS V1 SDK

 S3Object currentS3Obj  = client.getObject(new GetObjectRequest(bucket, key).withRange(start, end));
    return currentS3Obj.getObjectContent();

With AWS V2 SDK

var range = String.format("bytes=%d-%d", start, end);
ResponseBytes<GetObjectResponse> currentS3Obj = client.getObjectAsBytes(GetObjectRequest.builder().bucket(bucket).key(key).range(range).build());
    return currentS3Obj.asInputStream();
Parapsychology answered 14/12, 2022 at 14:32 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.