Rusoto streamed upload using sigv4
Asked Answered
D

1

8

I'm having trouble streaming uploads to S3:

// rust version 1.42.0
// OS macos
// [dependencies]
// rusoto_core = "0.43.0"
// rusoto_s3 = "0.43.0"
// log = "0.4"
// pretty_env_logger = "0.4.0"
// tokio = "0.2.14"
// tokio-util = { version = "0.3.1", features = ["codec"] }
// futures = "0.3.4"
// bytes = "0.5.4"

#![allow(dead_code)]
#[allow(unused_imports)]

use log::{debug,info,warn,error};
use bytes::Bytes;
use tokio_util::codec;
use futures::stream::{Stream, TryStreamExt};
use rusoto_core::Region;
use rusoto_s3::{PutObjectRequest, S3, S3Client};
use tokio::io::{AsyncRead, Result};

#[tokio::main]
async fn main() {
    pretty_env_logger::init();
    let pathbuf = std::path::PathBuf::from(String::from("/tmp/test.bin"));
    copy_to_s3(&pathbuf).await;
}

async fn copy_to_s3(pathbuf: &std::path::PathBuf) {
    debug!("copy_to_s3: {:?}", pathbuf);

    let tokio_file = tokio::fs::File::open(pathbuf.as_path()).await;

    let filename = pathbuf.file_name().unwrap().to_str().unwrap();
    debug!("filename: {:?}", filename);
    let s3_client = S3Client::new(Region::EuWest2);
    let result = s3_client.put_object(PutObjectRequest {
        bucket: String::from("..."),
        key: filename.to_owned(),
        server_side_encryption: Some("AES256".to_string()),

        body: Some(rusoto_core::ByteStream::new(into_bytes_stream(tokio_file.unwrap()))),
        ..Default::default()
    }).await;

    match result {
        Ok(success) => { 
            debug!("Success: {:?}", success);
        },
        Err(error) => {
            error!("Failure: {:?}", error);
        }
    }

    debug!("DONE: copy_to_s3: {:?}", pathbuf);
}

fn into_bytes_stream<R>(r: R) -> impl Stream<Item=Result<Bytes>>
where
    R: AsyncRead,
{
    codec::FramedRead::new(r, codec::BytesCodec::new())
        .map_ok(|bytes| bytes.freeze())
}

I generate a binary file by using dd if=/dev/zero of=/tmp/test.bin bs=4k count=500.

Not withstanding that I haven't quite wrapped my head around the future stuff yet, I'm just trying to get something dumping a file into S3, with the minimum amount of memory usage possible.

On run, I get the following output with debug logging; potentially sensitive information ellipsed:

$ RUST_LOG=debug cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.36s
     Running `target/debug/uploader`
 DEBUG uploader > copy_to_s3: "/tmp/test.bin"
 DEBUG uploader > filename: "test.bin"
 DEBUG rusoto_core::request > Full request:
 method: PUT
 final_uri: https://s3.eu-west-2.amazonaws.com/.../test.bin
Headers:

 DEBUG rusoto_core::request > authorization:"AWS4-HMAC-SHA256 Credential=.../20200408/eu-west-2/s3/aws4_request, SignedHeaders=content-type;host;x-amz-content-sha256;x-amz-date;x-amz-security-token;x-amz-server-side-encryption, Signature=..."
 DEBUG rusoto_core::request > content-type:"application/octet-stream"
 DEBUG rusoto_core::request > host:"s3.eu-west-2.amazonaws.com"
 DEBUG rusoto_core::request > x-amz-content-sha256:"UNSIGNED-PAYLOAD"
 DEBUG rusoto_core::request > x-amz-date:"20200408T173930Z"
 DEBUG rusoto_core::request > x-amz-security-token:"..."
 DEBUG rusoto_core::request > x-amz-server-side-encryption:"AES256"
 DEBUG rusoto_core::request > user-agent:"rusoto/0.43.0 rust/1.42.0 macos"
 DEBUG hyper::client::connect::dns > resolving host="s3.eu-west-2.amazonaws.com"
 DEBUG hyper::client::connect::http > connecting to 52.95.148.48:443
 DEBUG hyper::client::connect::http > connected to 52.95.148.48:443
 DEBUG hyper::proto::h1::io         > flushed 1070 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 147600 bytes
 DEBUG hyper::proto::h1::io         > flushed 418200 bytes
 DEBUG hyper::proto::h1::io         > flushed 418200 bytes
 DEBUG hyper::proto::h1::io         > flushed 418200 bytes
 DEBUG hyper::proto::h1::io         > flushed 418200 bytes
 DEBUG hyper::proto::h1::io         > flushed 16405 bytes
 DEBUG hyper::proto::h1::io         > read 291 bytes
 DEBUG hyper::proto::h1::io         > parsed 7 headers
 DEBUG hyper::proto::h1::conn       > incoming body is chunked encoding
 DEBUG hyper::proto::h1::io         > read 345 bytes
 DEBUG hyper::proto::h1::decode     > incoming chunked header: 0x14D (333 bytes)
 DEBUG hyper::proto::h1::conn       > incoming body completed
 DEBUG rusoto_core::proto::xml::error > Ignoring unknown XML element "Header" in error response.
 DEBUG rusoto_core::proto::xml::error > Ignoring unknown XML element "RequestId" in error response.
 DEBUG rusoto_core::proto::xml::error > Ignoring unknown XML element "HostId" in error response.
 ERROR uploader                       > Failure: Unknown(BufferedHttpResponse {status: 501, body: "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<Error><Code>NotImplemented</Code><Message>A header you provided implies functionality that is not implemented</Message><Header>Transfer-Encoding</Header><RequestId>3F1A03D67D81CCAB</RequestId><HostId>...=</HostId></Error>", headers: {"x-amz-request-id": "3F1A03D67D81CCAB", "x-amz-id-2": "...", "content-type": "application/xml", "transfer-encoding": "chunked", "date": "Wed, 08 Apr 2020 17:39:30 GMT", "connection": "close", "server": "AmazonS3"} })
 DEBUG uploader                       > DONE: copy_to_s3: "/tmp/test.bin"

I think this is telling me that it's not a sigv4 signed upload, but I'm not sure.

For the most part, the debug output looks like it's successfully sending the file in chunks, but then it errors...

Given my assumption about it being sent sigv2 and not sigv4, how do I go about making it send the sigv4 headers instead? Failing that, what have I missed?

Desiccated answered 8/4, 2020 at 15:51 Comment(3)
It's hard to answer your question because it doesn't include a minimal reproducible example. We can't tell what crates (and their versions), types, traits, fields, etc. are present in the code. It would make it easier for us to help you if you try to reproduce your error on the Rust Playground if possible, otherwise in a brand new Cargo project, then edit your question to include the additional info. There are Rust-specific MRE tips you can use to reduce your original code for posting here. Thanks!Nickelous
On it. Will post back shortlyDesiccated
Done. MRE added. More complete output added.Desiccated
Z
2

Content-Length should be specified.

Changed section

let result = s3_client.put_object(PutObjectRequest {
        bucket: String::from("..."),
        key: filename.to_owned(),
        server_side_encryption: Some("AES256".to_string()),
        // Based on dd if=/dev/zero of=/tmp/test.bin bs=4k count=500
        content_length: Some(2_048_000),
        body: Some(rusoto_core::ByteStream::new(into_bytes_stream(tokio_file.unwrap()))),
        ..Default::default()
    }).await;

Full text of the fixed example

// rust version 1.42.0
// OS macos
// [dependencies]
// rusoto_core = "0.43.0"
// rusoto_s3 = "0.43.0"
// log = "0.4"
// pretty_env_logger = "0.4.0"
// tokio = "0.2.14"
// tokio-util = { version = "0.3.1", features = ["codec"] }
// futures = "0.3.4"
// bytes = "0.5.4"

#![allow(dead_code)]
#[allow(unused_imports)]

use log::{debug,info,warn,error};
use bytes::Bytes;
use tokio_util::codec;
use futures::stream::{Stream, TryStreamExt};
use rusoto_core::Region;
use rusoto_s3::{PutObjectRequest, S3, S3Client};
use tokio::io::{AsyncRead, Result};

#[tokio::main]
async fn main() {
    pretty_env_logger::init();
    let pathbuf = std::path::PathBuf::from(String::from("/tmp/test.bin"));
    copy_to_s3(&pathbuf).await;
}

async fn copy_to_s3(pathbuf: &std::path::PathBuf) {
    debug!("copy_to_s3: {:?}", pathbuf);

    let tokio_file = tokio::fs::File::open(pathbuf.as_path()).await;

    let filename = pathbuf.file_name().unwrap().to_str().unwrap();
    debug!("filename: {:?}", filename);
    let s3_client = S3Client::new(Region::EuWest2);
    let result = s3_client.put_object(PutObjectRequest {
        bucket: String::from("..."),
        key: filename.to_owned(),
        server_side_encryption: Some("AES256".to_string()),
        // Based on dd if=/dev/zero of=/tmp/test.bin bs=4k count=500
        content_length: Some(2_048_000),
        body: Some(rusoto_core::ByteStream::new(into_bytes_stream(tokio_file.unwrap()))),
        ..Default::default()
    }).await;

    match result {
        Ok(success) => { 
            debug!("Success: {:?}", success);
        },
        Err(error) => {
            error!("Failure: {:?}", error);
        }
    }

    debug!("DONE: copy_to_s3: {:?}", pathbuf);
}

fn into_bytes_stream<R>(r: R) -> impl Stream<Item=Result<Bytes>>
where
    R: AsyncRead,
{
    codec::FramedRead::new(r, codec::BytesCodec::new())
        .map_ok(|bytes| bytes.freeze())
}

The code works as intended, however, you have to know the length of the file beforehand.

Zoosperm answered 19/4, 2020 at 2:20 Comment(1)
Thank you very much. Very appreciated. Can you explain what was happening and why it needs the file length? I'll hazard a guess that the content length is part of the sigv4 header??Desiccated

© 2022 - 2024 — McMap. All rights reserved.