Upload ZipOutputStream to S3 without saving zip file (large) temporary to disk using AWS S3 Java
Asked Answered
E

4

15

I have a requirement to download photos (not in same directory) from S3, ZIP them and again upload to S3 using AWS S3 Java SDK. This zip file size can go in GBs. Currently I am using AWS Lambda which has a limitation of temporary storage up to 500 MB. So I don't want to save ZIP file on disk instead I want to stream ZIP file (which is being created dynamically using downloaded photos from S3) directly to S3. I need this using AWS S3 Java SDK.

Erastes answered 17/3, 2019 at 5:39 Comment(2)
Answered similar question herePapua
As normal images are already compressed (exception *.bmp), you can add explicitly not compressing those files. I am amazed that you want to use a huge zip file. Own R/O file system?Trafalgar
S
4

The basic idea is to use streaming operations. This way you won't wait till the ZIP is generated on a filesystem, but start uploading as soon, as the ZIP algorithm produces any data. Obviously, some data will be buffered in memory, still no need to wait for the whole ZIP to be generated on a disk. We'll also use stream compositions and PipedInputStream / PipedOutputStream in two threads: one to read the data, and the other to ZIP the contents.

Here is a version for :

final AmazonS3 client = AmazonS3ClientBuilder.defaultClient();

final PipedOutputStream pipedOutputStream = new PipedOutputStream();
final PipedInputStream pipedInputStream = new PipedInputStream(pipedOutputStream);

final Thread s3In = new Thread(() -> {
    try (final ZipOutputStream zipOutputStream = new ZipOutputStream(pipedOutputStream)) {
        S3Objects
                // It's just a convenient way to list all the objects. Replace with you own logic.
                .inBucket(client, "bucket")
                .forEach((S3ObjectSummary objectSummary) -> {
                    try {
                        if (objectSummary.getKey().endsWith(".png")) {
                            System.out.println("Processing " + objectSummary.getKey());

                            final ZipEntry entry = new ZipEntry(
                                    UUID.randomUUID().toString() + ".png" // I'm too lazy to extract file name from the
                                    // objectSummary
                            );

                            zipOutputStream.putNextEntry(entry);

                            IOUtils.copy(
                                    client.getObject(
                                            objectSummary.getBucketName(),
                                            objectSummary.getKey()
                                    ).getObjectContent(),
                                    zipOutputStream
                            );

                            zipOutputStream.closeEntry();
                        }
                    } catch (final Exception all) {
                        all.printStackTrace();
                    }
                });
    } catch (final Exception all) {
        all.printStackTrace();
    }
});
final Thread s3Out = new Thread(() -> {
    try {
        client.putObject(
                "another-bucket",
                "previews.zip",
                pipedInputStream,
                new ObjectMetadata()
        );

        pipedInputStream.close();
    } catch (final Exception all) {
        all.printStackTrace();
    }
});

s3In.start();
s3Out.start();

s3In.join();
s3Out.join();

However, note that it will print a warning:

WARNING: No content length specified for stream data.  Stream contents will be buffered in memory and could result in out of memory errors.

That's because S3 needs to know the size of data in advance, before the upload. It's impossible to know the size of a resulting ZIP in advance. You can probably try your luck with multipart uploads, but the code will be more trickier. Though, the idea would be similar: one thread should read the data and send the content in ZIP stream and the other thread should read ZIPped entries and upload them as multiparts. After all the entries (parts) are uploaded, the multipart should be completed.

Here is an example for :

final S3Client client = S3Client.create();

final PipedOutputStream pipedOutputStream = new PipedOutputStream();
final PipedInputStream pipedInputStream = new PipedInputStream(pipedOutputStream);

final Thread s3In = new Thread(() -> {
    try (final ZipOutputStream zipOutputStream = new ZipOutputStream(pipedOutputStream)) {
        client.listObjectsV2Paginator(
                ListObjectsV2Request
                        .builder()
                        .bucket("bucket")
                        .build()
        )
                .contents()
                .forEach((S3Object object) -> {
                    try {
                        if (object.key().endsWith(".png")) {
                            System.out.println("Processing " + object.key());

                            final ZipEntry entry = new ZipEntry(
                                    UUID.randomUUID().toString() + ".png" // I'm too lazy to extract file name from the object
                            );

                            zipOutputStream.putNextEntry(entry);

                            client.getObject(
                                    GetObjectRequest
                                            .builder()
                                            .bucket("bucket")
                                            .key(object.key())
                                            .build(),
                                    ResponseTransformer.toOutputStream(zipOutputStream)
                            );

                            zipOutputStream.closeEntry();
                        }
                    } catch (final Exception all) {
                        all.printStackTrace();
                    }
                });
    } catch (final Exception all) {
        all.printStackTrace();
    }
});
final Thread s3Out = new Thread(() -> {
    try {
        client.putObject(
                PutObjectRequest
                        .builder()
                        .bucket("another-bucket")
                        .key("previews.zip")
                        .build(),
                RequestBody.fromBytes(
                        IOUtils.toByteArray(pipedInputStream)
                )
        );
    } catch (final Exception all) {
        all.printStackTrace();
    }
});

s3In.start();
s3Out.start();

s3In.join();
s3Out.join();

It suffers from the same plague: the ZIP needs to be prepared in memory before the upload.

If you're interested, I've prepared a demo project, so you can play with the code.

Stereochemistry answered 15/8, 2019 at 11:21 Comment(0)
A
1

The problem is the AWS Java SDK for S3 does not support a way to stream writing to an OutputStream. The following snippet implements an 'S3OutputStream', which extends from OutputStream and will automatically perform the 'putObject' or 'initiateMultipartUpload', depending on the size. This allows you to pass this S3OutputStream to the constructor of ZipOutputStream, e.g. new ZipOutputStream(new S3OutputStream(s3Client, "my_bucket", "path"))

import java.io.ByteArrayInputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3OutputStream extends OutputStream {

    private static final Logger LOG = LoggerFactory.getLogger(S3OutputStream.class);

    /** Default chunk size is 10MB */
    protected static final int BUFFER_SIZE = 10000000;

    /** The bucket-name on Amazon S3 */
    private final String bucket;

    /** The path (key) name within the bucket */
    private final String path;

    /** The temporary buffer used for storing the chunks */
    private final byte[] buf;

    /** The position in the buffer */
    private int position;

    /** Amazon S3 client. TODO: support KMS */
    private final AmazonS3 s3Client;

    /** The unique id for this upload */
    private String uploadId;

    /** Collection of the etags for the parts that have been uploaded */
    private final List<PartETag> etags;

    /** indicates whether the stream is still open / valid */
    private boolean open;

    /**
     * Creates a new S3 OutputStream
     * @param s3Client the AmazonS3 client
     * @param bucket name of the bucket
     * @param path path within the bucket
     */
    public S3OutputStream(AmazonS3 s3Client, String bucket, String path) {
        this.s3Client = s3Client;
        this.bucket = bucket;
        this.path = path;
        this.buf = new byte[BUFFER_SIZE];
        this.position = 0;
        this.etags = new ArrayList<>();
        this.open = true;
    }

    /**
     * Write an array to the S3 output stream.
     *
     * @param b the byte-array to append
     */
    @Override
    public void write(byte[] b) {
        write(b,0,b.length);
    }

    /**
     * Writes an array to the S3 Output Stream
     *
     * @param byteArray the array to write
     * @param o the offset into the array
     * @param l the number of bytes to write
     */
    @Override
    public void write(final byte[] byteArray, final int o, final int l) {
        this.assertOpen();
        int ofs = o, len = l;
        int size;
        while (len > (size = this.buf.length - position)) {
            System.arraycopy(byteArray, ofs, this.buf, this.position, size);
            this.position += size;
            flushBufferAndRewind();
            ofs += size;
            len -= size;
        }
        System.arraycopy(byteArray, ofs, this.buf, this.position, len);
        this.position += len;
    }

    /**
     * Flushes the buffer by uploading a part to S3.
     */
    @Override
    public synchronized void flush() {
        this.assertOpen();
        LOG.debug("Flush was called");
    }

    protected void flushBufferAndRewind() {
        if (uploadId == null) {
            LOG.debug("Starting a multipart upload for {}/{}",this.bucket,this.path);
            final InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(this.bucket, this.path)
                    .withCannedACL(CannedAccessControlList.BucketOwnerFullControl);
            InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(request);
            this.uploadId = initResponse.getUploadId();
        }
        uploadPart();
        this.position = 0;
    }

    protected void uploadPart() {
        LOG.debug("Uploading part {}",this.etags.size());
        UploadPartResult uploadResult = this.s3Client.uploadPart(new UploadPartRequest()
                .withBucketName(this.bucket)
                .withKey(this.path)
                .withUploadId(this.uploadId)
                .withInputStream(new ByteArrayInputStream(buf,0,this.position))
                .withPartNumber(this.etags.size() + 1)
                .withPartSize(this.position));
        this.etags.add(uploadResult.getPartETag());
    }

    @Override
    public void close() {
        if (this.open) {
            this.open = false;
            if (this.uploadId != null) {
                if (this.position > 0) {
                    uploadPart();
                }
                LOG.debug("Completing multipart");
                this.s3Client.completeMultipartUpload(new CompleteMultipartUploadRequest(bucket, path, uploadId, etags));
            }
            else {
                LOG.debug("Uploading object at once to {}/{}",this.bucket,this.path);
                final ObjectMetadata metadata = new ObjectMetadata();
                metadata.setContentLength(this.position);
                final PutObjectRequest request = new PutObjectRequest(this.bucket, this.path, new ByteArrayInputStream(this.buf, 0, this.position), metadata)
                        .withCannedAcl(CannedAccessControlList.BucketOwnerFullControl);
                this.s3Client.putObject(request);
            }
        }
    }

    public void cancel() {
        this.open = false;
        if (this.uploadId != null) {
            LOG.debug("Aborting multipart upload");
            this.s3Client.abortMultipartUpload(new AbortMultipartUploadRequest(this.bucket, this.path, this.uploadId));
        }
    }

    @Override
    public void write(int b) {
        this.assertOpen();
        if (position >= this.buf.length) {
            flushBufferAndRewind();
        }
        this.buf[position++] = (byte)b;
    }

    private void assertOpen() {
        if (!this.open) {
            throw new IllegalStateException("Closed");
        }
    }
}

Abb answered 8/10, 2020 at 13:19 Comment(0)
F
0

I am very late to answer your question I did it for my latest project yesterday, have a look at full code below

Let say when we upload a file on s3 it returns ObjectKey of uploaded file, here I created a class for same and named as FileKey.

package com.myprojectName.model.key;

import java.time.Instant;

import javax.persistence.Entity;

import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Entity
@NoArgsConstructor
public class FileKey {

    private String fileObjectKey;
    
    private String fileName;
    
    private int fileSize;
    
    private String fileType;
    
}

The return values of presignedUrl I stored in DownloadDetailsDTO

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;

import java.net.URL;

@NoArgsConstructor
@AllArgsConstructor
@Getter
@Builder
public class FileDownloadDetailsDTO {

    private String name;
    private Long size;
    private String contentType;
    private URL preSignedDownloadUrl;

    public FileDownloadDetailsDTO(PreSignedUrlAndMetadata objectMetadata) {
        this.name = objectMetadata.getName();
        this.size = objectMetadata.getSize();
        this.contentType = objectMetadata.getContentType();
        this.preSignedDownloadUrl = objectMetadata.getUrl();
    }

}

PreSignedUrlAndMetaData contains Url created on s3 bucket, If not sure look at below code

public class PreSignedUrlAndMetadata {

private final URL url;

private final String name;

private final String contentType;

private final Long size;

}

The below method will take each file of s3 bucket and store to zip file as zip entry and return a presigned URL of zip file (No need to store in local temp)

 public FileDownloadDetailsDTO getDownloadFilesInZipDetails(String zipFileName, List<FileKey> files) {

        PreSignedUrlAndMetadata preSignedUrlAndMetadata;
        File zipFile = null;
        try {
            zipFile = File.createTempFile(zipFileName, "file");

            try (FileOutputStream fos = new FileOutputStream(zipFile); ZipOutputStream zos = new ZipOutputStream(fos)) {

                for (FileKey file : files) {
                    String name = null;
                    if (ObjectUtils.isNotEmpty(file.getFileName())) {
                        name = file.getFileName();
                    }
                        ZipEntry entry = new ZipEntry(name);

                    try (InputStream inputStream = getInputStreamForFileKey(file.getFileObjectKey())) {
                        zos.putNextEntry(entry);
                        IOUtils.copy(inputStream, zos);
                        zos.closeEntry();
                    }
                }
            }

            try (FileInputStream fis = new FileInputStream(zipFile)) {
                TempFileObjectKey fileObjectKey =uploadTemp(fis, zipFile.length());
                preSignedUrlAndMetadata = new PreSignedUrlAndMetadata(url, metadata.getUserMetaDataOf(USER_METADATA_NAME), contentType, metadata.getContentLength());
            }

        } catch (Exception e) {
            throw new ApplicationException("Error while creating zip file for " + archiveRequestDTO.getArchiveName(), e, ApplicationErrorCode.INTERNAL_SERVER_ERROR);
        } finally {
            FileUtils.deleteQuietly(zipFile);
        }

        return FileDownloadDetailsDTO.builder().name(archiveRequestDTO.getArchiveName() + ".zip")
                .size(preSignedUrlAndMetadata.getSize()).preSignedDownloadUrl(preSignedUrlAndMetadata.getUrl()).build();

    }

 public InputStream getInputStreamForFileKey(String key) {
    TempFileObjectKey tempFileObjectKey = new TempFileObjectKey(getActualPrefix(key));
    return storageService.getInputStream(tempFileObjectKey);
}


String getActualPrefix(String prefix){
    return prefix.replaceAll("_","/");
}

public TempFileObjectKey uploadTemp(InputStream inputStream, Long length) {
    TempFileObjectKey tempFileObjectKey = s3StorageManager.buildTempFileFullKey();
    ObjectMetadata objectMetadata = new ObjectMetadata();
    if (length != null) {
        objectMetadata.setContentLength(length);
    }
    Upload upload = com.amazonaws.services.s3.transfer.TransferManager.upload(getBucketName(abstractObjectKey), abstractObjectKey.getObjectKey(), inputStream, objectMetadata);
    try {
        upload.waitForCompletion();
    } catch (InterruptedException e) {
        throw new ApplicationException(e.getMessage(), e, ApplicationErrorCode.INTERNAL_SERVER_ERROR);
    }
    return tempFileObjectKey;
}

I hope this will help you folks. You are open to ask me if still having any doubt. Thanks,

Feet answered 25/1, 2022 at 6:41 Comment(1)
how to unzip the archive file through lambda and move that unzipped file to existing key into the same bucket, lets say, i have one zip file uploaded in folder 1 and now my lambda trigger, lambda unzipped the file and now i want to move the same to the another folder into the same bucket, how to achieve that, because s3 follows flat file storage system there is no concept of folder.Omission
A
0

I've created a GitHub repository with my implementation that you can check out here: https://github.com/yufeikang/serverless-zip-s3 The repository includes a Lambda function written in Node.js that you can use to zip your S3 directory, as well as instructions on how to deploy and use the function.

Amon answered 28/2, 2023 at 14:48 Comment(1)
It will be great if you could add some more details related your package structure and how to use it. The user may not be familiar with node.js or lambda.Isagoge

© 2022 - 2024 — McMap. All rights reserved.