How do I read the entire body of a Tokio-based Hyper request?
Asked Answered
T

4

19

I want to write a server using the current master branch of Hyper that saves a message that is delivered by a POST request and sends this message to every incoming GET request.

I have this, mostly copied from the Hyper examples directory:

extern crate futures;
extern crate hyper;
extern crate pretty_env_logger;

use futures::future::FutureResult;

use hyper::{Get, Post, StatusCode};
use hyper::header::{ContentLength};
use hyper::server::{Http, Service, Request, Response};
use futures::Stream;

struct Echo {
    data: Vec<u8>,
}

impl Echo {
    fn new() -> Self {
        Echo {
            data: "text".into(),
        }
    }
}

impl Service for Echo {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = FutureResult<Response, hyper::Error>;

    fn call(&self, req: Self::Request) -> Self::Future {
        let resp = match (req.method(), req.path()) {
            (&Get, "/") | (&Get, "/echo") => {
                Response::new()
                    .with_header(ContentLength(self.data.len() as u64))
                    .with_body(self.data.clone())
            },
            (&Post, "/") => {
                //self.data.clear(); // argh. &self is not mutable :(
                // even if it was mutable... how to put the entire body into it?
                //req.body().fold(...) ?
                let mut res = Response::new();
                if let Some(len) = req.headers().get::<ContentLength>() {
                    res.headers_mut().set(ContentLength(0));
                }
                res.with_body(req.body())
            },
            _ => {
                Response::new()
                    .with_status(StatusCode::NotFound)
            }
        };
        futures::future::ok(resp)
    }

}


fn main() {
    pretty_env_logger::init().unwrap();
    let addr = "127.0.0.1:12346".parse().unwrap();

    let server = Http::new().bind(&addr, || Ok(Echo::new())).unwrap();
    println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
    server.run().unwrap();
}

How do I turn the req.body() (which seems to be a Stream of Chunks) into a Vec<u8>? I assume I must somehow return a Future that consumes the Stream and turns it into a single Vec<u8>, maybe with fold(). But I have no clue how to do that.

Tuft answered 14/4, 2017 at 22:21 Comment(10)
How to share mutable state for a Hyper handler? answers half of your question, so I've reworded your question to focus on the unique aspect and help avoid downvotes.Rodmun
Thank you for your first edit @Shepmaster. After that it looked really polished. However, I don't like your second edit. I don't see how the linked question answers my question. They don't even implement the trait Service.Tuft
You are always able to roll back any edit you disagree with from the revision list or perform further edits.Rodmun
However, the answer is the same, you will need thread-safe interior mutability such as a Mutex, Atomic*, or RwLock.Rodmun
Please be aware that you are expected to have one question per question, as well.Rodmun
Ok, then I will just open a new question for the first one. Thank you.Tuft
Sounds good. In that case, I'd also advise you to create a minimal reproducible example and use a released version of Hyper. Linking to the question I suggested above and showing why it is not valid for your case will go a long way as well!Rodmun
@Rodmun seems like the Tokio developers want to modify or maybe even delete the Service trait. So maybe my other question will resolve itself, once tokio-service-0.2 is out.Tuft
Good to know, but tokio::Service is presumably different from hyper::Service, right? Even if Tokio removes it, would Hyper keep it?Rodmun
@Rodmun I assume Hyper just copied the trait, when they started to migrate to Tokio. But yeah, maybe they will keep it, even if Tokio does not.Tuft
R
15

I'm going to simplify the problem to just return the total number of bytes, instead of echoing the entire stream.

Futures 0.3

Hyper 0.13 + TryStreamExt::try_fold

See euclio's answer about hyper::body::to_bytes if you just want all the data as one giant blob.

Accessing the stream allows for more fine-grained control:

use futures::TryStreamExt; // 0.3.7
use hyper::{server::Server, service, Body, Method, Request, Response}; // 0.13.9
use std::convert::Infallible;
use tokio; // 0.2.22

#[tokio::main]
async fn main() {
    let addr = "127.0.0.1:12346".parse().expect("Unable to parse address");

    let server = Server::bind(&addr).serve(service::make_service_fn(|_conn| async {
        Ok::<_, Infallible>(service::service_fn(echo))
    }));

    println!("Listening on http://{}.", server.local_addr());

    if let Err(e) = server.await {
        eprintln!("Error: {}", e);
    }
}

async fn echo(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
    let (parts, body) = req.into_parts();
    match (parts.method, parts.uri.path()) {
        (Method::POST, "/") => {
            let entire_body = body
                .try_fold(Vec::new(), |mut data, chunk| async move {
                    data.extend_from_slice(&chunk);
                    Ok(data)
                })
                .await;

            entire_body.map(|body| {
                let body = Body::from(format!("Read {} bytes", body.len()));
                Response::new(body)
            })
        }
        _ => {
            let body = Body::from("Can only POST to /");
            Ok(Response::new(body))
        }
    }
}

Unfortunately, the current implementation of Bytes is no longer compatible with TryStreamExt::try_concat, so we have to switch back to a fold.

Futures 0.1

hyper 0.12 + Stream::concat2

Since futures 0.1.14, you can use Stream::concat2 to stick together all the data into one:

fn concat2(self) -> Concat2<Self>
where
    Self: Sized,
    Self::Item: Extend<<Self::Item as IntoIterator>::Item> + IntoIterator + Default, 
use futures::{
    future::{self, Either},
    Future, Stream,
}; // 0.1.25

use hyper::{server::Server, service, Body, Method, Request, Response}; // 0.12.20

use tokio; // 0.1.14

fn main() {
    let addr = "127.0.0.1:12346".parse().expect("Unable to parse address");

    let server = Server::bind(&addr).serve(|| service::service_fn(echo));

    println!("Listening on http://{}.", server.local_addr());

    let server = server.map_err(|e| eprintln!("Error: {}", e));
    tokio::run(server);
}

fn echo(req: Request<Body>) -> impl Future<Item = Response<Body>, Error = hyper::Error> {
    let (parts, body) = req.into_parts();

    match (parts.method, parts.uri.path()) {
        (Method::POST, "/") => {
            let entire_body = body.concat2();
            let resp = entire_body.map(|body| {
                let body = Body::from(format!("Read {} bytes", body.len()));
                Response::new(body)
            });
            Either::A(resp)
        }
        _ => {
            let body = Body::from("Can only POST to /");
            let resp = future::ok(Response::new(body));
            Either::B(resp)
        }
    }
}

You could also convert the Bytes into a Vec<u8> via entire_body.to_vec() and then convert that to a String.

See also:

hyper 0.11 + Stream::fold

Similar to Iterator::fold, Stream::fold takes an accumulator (called init) and a function that operates on the accumulator and an item from the stream. The result of the function must be another future with the same error type as the original. The total result is itself a future.

fn fold<F, T, Fut>(self, init: T, f: F) -> Fold<Self, F, Fut, T>
where
    F: FnMut(T, Self::Item) -> Fut,
    Fut: IntoFuture<Item = T>,
    Self::Error: From<Fut::Error>,
    Self: Sized,

We can use a Vec as the accumulator. Body's Stream implementation returns a Chunk. This implements Deref<[u8]>, so we can use that to append each chunk's data to the Vec.

extern crate futures; // 0.1.23
extern crate hyper;   // 0.11.27

use futures::{Future, Stream};
use hyper::{
    server::{Http, Request, Response, Service}, Post,
};

fn main() {
    let addr = "127.0.0.1:12346".parse().unwrap();

    let server = Http::new().bind(&addr, || Ok(Echo)).unwrap();
    println!(
        "Listening on http://{} with 1 thread.",
        server.local_addr().unwrap()
    );
    server.run().unwrap();
}

struct Echo;

impl Service for Echo {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = Box<futures::Future<Item = Response, Error = Self::Error>>;

    fn call(&self, req: Self::Request) -> Self::Future {
        match (req.method(), req.path()) {
            (&Post, "/") => {
                let f = req.body()
                    .fold(Vec::new(), |mut acc, chunk| {
                        acc.extend_from_slice(&*chunk);
                        futures::future::ok::<_, Self::Error>(acc)
                    })
                    .map(|body| Response::new().with_body(format!("Read {} bytes", body.len())));

                Box::new(f)
            }
            _ => panic!("Nope"),
        }
    }
}

You could also convert the Vec<u8> body to a String.

See also:

Output

When called from the command line, we can see the result:

$ curl -X POST --data hello http://127.0.0.1:12346/
Read 5 bytes

Warning

All of these solutions allow a malicious end user to POST an infinitely sized file, which would cause the machine to run out of memory. Depending on the intended use, you may wish to establish some kind of cap on the number of bytes read, potentially writing to the filesystem at some breakpoint.

See also:

Rodmun answered 14/4, 2017 at 23:33 Comment(8)
Could you also explain why you have futures::future:ok() in the fold method, although you deleted FutureResult from type Future = ...?Tuft
@Tuft because the closure given to fold needs to return a future itself: F: FnMut(T, Self::Item) -> Fut. This allows for the operation to itself take time. Since extend_from_slice is synchronous, we "lift" up the result using future::ok. This is pretty separate from the type Future = FutureResult, which is used as the return value of the handler (which I boxed out of laziness).Rodmun
The Stream::fold(...) can be replaced with Stream::concat2() that does the same thing. Chunk is itself Extend so the result of concat2 will be a single Chunk that contains the whole body.Muskellunge
@Muskellunge thanks! When I wrote this answer originally, concat2 didn't even exist!Rodmun
What would be a technique to (for example) stop after 1MiB of input, using concat2 ?Cannular
@Cannular How do I apply a limit to the number of bytes read by futures::Stream::concat2?Rodmun
If you write data to the filesystem to avoid running out of memory, then you just allow an attacker to fill up your server's filesystems. Don't do that.Crosspollinate
@M.Leonhard you may wish to establish some kind of cap on the number of bytes read, potentially writing to the filesystem at some breakpoint. — my suggestion is not to write to the filesystem instead of just using memory, but in addition to an existing cap.Rodmun
E
38

Hyper 0.13 provides a body::to_bytes function for this purpose.

use hyper::body;
use hyper::{Body, Response};

pub async fn read_response_body(res: Response<Body>) -> Result<String, hyper::Error> {
    let bytes = body::to_bytes(res.into_body()).await?;
    Ok(String::from_utf8(bytes.to_vec()).expect("response was not valid utf-8"))
}
Energize answered 20/12, 2019 at 16:19 Comment(1)
Response implements HttpBody required by to_bytes, so no need to call into_body() explicitly. And Bytes implements AsRef<[u8]>, so you can avoid Vec allocation.Dorso
R
15

I'm going to simplify the problem to just return the total number of bytes, instead of echoing the entire stream.

Futures 0.3

Hyper 0.13 + TryStreamExt::try_fold

See euclio's answer about hyper::body::to_bytes if you just want all the data as one giant blob.

Accessing the stream allows for more fine-grained control:

use futures::TryStreamExt; // 0.3.7
use hyper::{server::Server, service, Body, Method, Request, Response}; // 0.13.9
use std::convert::Infallible;
use tokio; // 0.2.22

#[tokio::main]
async fn main() {
    let addr = "127.0.0.1:12346".parse().expect("Unable to parse address");

    let server = Server::bind(&addr).serve(service::make_service_fn(|_conn| async {
        Ok::<_, Infallible>(service::service_fn(echo))
    }));

    println!("Listening on http://{}.", server.local_addr());

    if let Err(e) = server.await {
        eprintln!("Error: {}", e);
    }
}

async fn echo(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
    let (parts, body) = req.into_parts();
    match (parts.method, parts.uri.path()) {
        (Method::POST, "/") => {
            let entire_body = body
                .try_fold(Vec::new(), |mut data, chunk| async move {
                    data.extend_from_slice(&chunk);
                    Ok(data)
                })
                .await;

            entire_body.map(|body| {
                let body = Body::from(format!("Read {} bytes", body.len()));
                Response::new(body)
            })
        }
        _ => {
            let body = Body::from("Can only POST to /");
            Ok(Response::new(body))
        }
    }
}

Unfortunately, the current implementation of Bytes is no longer compatible with TryStreamExt::try_concat, so we have to switch back to a fold.

Futures 0.1

hyper 0.12 + Stream::concat2

Since futures 0.1.14, you can use Stream::concat2 to stick together all the data into one:

fn concat2(self) -> Concat2<Self>
where
    Self: Sized,
    Self::Item: Extend<<Self::Item as IntoIterator>::Item> + IntoIterator + Default, 
use futures::{
    future::{self, Either},
    Future, Stream,
}; // 0.1.25

use hyper::{server::Server, service, Body, Method, Request, Response}; // 0.12.20

use tokio; // 0.1.14

fn main() {
    let addr = "127.0.0.1:12346".parse().expect("Unable to parse address");

    let server = Server::bind(&addr).serve(|| service::service_fn(echo));

    println!("Listening on http://{}.", server.local_addr());

    let server = server.map_err(|e| eprintln!("Error: {}", e));
    tokio::run(server);
}

fn echo(req: Request<Body>) -> impl Future<Item = Response<Body>, Error = hyper::Error> {
    let (parts, body) = req.into_parts();

    match (parts.method, parts.uri.path()) {
        (Method::POST, "/") => {
            let entire_body = body.concat2();
            let resp = entire_body.map(|body| {
                let body = Body::from(format!("Read {} bytes", body.len()));
                Response::new(body)
            });
            Either::A(resp)
        }
        _ => {
            let body = Body::from("Can only POST to /");
            let resp = future::ok(Response::new(body));
            Either::B(resp)
        }
    }
}

You could also convert the Bytes into a Vec<u8> via entire_body.to_vec() and then convert that to a String.

See also:

hyper 0.11 + Stream::fold

Similar to Iterator::fold, Stream::fold takes an accumulator (called init) and a function that operates on the accumulator and an item from the stream. The result of the function must be another future with the same error type as the original. The total result is itself a future.

fn fold<F, T, Fut>(self, init: T, f: F) -> Fold<Self, F, Fut, T>
where
    F: FnMut(T, Self::Item) -> Fut,
    Fut: IntoFuture<Item = T>,
    Self::Error: From<Fut::Error>,
    Self: Sized,

We can use a Vec as the accumulator. Body's Stream implementation returns a Chunk. This implements Deref<[u8]>, so we can use that to append each chunk's data to the Vec.

extern crate futures; // 0.1.23
extern crate hyper;   // 0.11.27

use futures::{Future, Stream};
use hyper::{
    server::{Http, Request, Response, Service}, Post,
};

fn main() {
    let addr = "127.0.0.1:12346".parse().unwrap();

    let server = Http::new().bind(&addr, || Ok(Echo)).unwrap();
    println!(
        "Listening on http://{} with 1 thread.",
        server.local_addr().unwrap()
    );
    server.run().unwrap();
}

struct Echo;

impl Service for Echo {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = Box<futures::Future<Item = Response, Error = Self::Error>>;

    fn call(&self, req: Self::Request) -> Self::Future {
        match (req.method(), req.path()) {
            (&Post, "/") => {
                let f = req.body()
                    .fold(Vec::new(), |mut acc, chunk| {
                        acc.extend_from_slice(&*chunk);
                        futures::future::ok::<_, Self::Error>(acc)
                    })
                    .map(|body| Response::new().with_body(format!("Read {} bytes", body.len())));

                Box::new(f)
            }
            _ => panic!("Nope"),
        }
    }
}

You could also convert the Vec<u8> body to a String.

See also:

Output

When called from the command line, we can see the result:

$ curl -X POST --data hello http://127.0.0.1:12346/
Read 5 bytes

Warning

All of these solutions allow a malicious end user to POST an infinitely sized file, which would cause the machine to run out of memory. Depending on the intended use, you may wish to establish some kind of cap on the number of bytes read, potentially writing to the filesystem at some breakpoint.

See also:

Rodmun answered 14/4, 2017 at 23:33 Comment(8)
Could you also explain why you have futures::future:ok() in the fold method, although you deleted FutureResult from type Future = ...?Tuft
@Tuft because the closure given to fold needs to return a future itself: F: FnMut(T, Self::Item) -> Fut. This allows for the operation to itself take time. Since extend_from_slice is synchronous, we "lift" up the result using future::ok. This is pretty separate from the type Future = FutureResult, which is used as the return value of the handler (which I boxed out of laziness).Rodmun
The Stream::fold(...) can be replaced with Stream::concat2() that does the same thing. Chunk is itself Extend so the result of concat2 will be a single Chunk that contains the whole body.Muskellunge
@Muskellunge thanks! When I wrote this answer originally, concat2 didn't even exist!Rodmun
What would be a technique to (for example) stop after 1MiB of input, using concat2 ?Cannular
@Cannular How do I apply a limit to the number of bytes read by futures::Stream::concat2?Rodmun
If you write data to the filesystem to avoid running out of memory, then you just allow an attacker to fill up your server's filesystems. Don't do that.Crosspollinate
@M.Leonhard you may wish to establish some kind of cap on the number of bytes read, potentially writing to the filesystem at some breakpoint. — my suggestion is not to write to the filesystem instead of just using memory, but in addition to an existing cap.Rodmun
T
1

For hyper 1.1.0, using collect() provided by http-body-util worked for me:

use anyhow::Result;
use http_body_util::BodyExt;
use hyper::body::Incoming;

async fn request_to_string(req: Request<Incoming>) -> Result<String> {
    String::from_utf8(req.into_body().collect().await?.to_bytes().into())?
}
Tola answered 13/2, 2024 at 16:1 Comment(0)
M
-1

Most of the answers on this topic are outdated or overly complicated. The solution is pretty simple:

/*
    WARNING for beginners!!! This use statement
    is important so we can later use .data() method!!!
*/
use hyper::body::HttpBody;

let my_vector: Vec<u8> = request.into_body().data().await.unwrap().unwrap().to_vec();
let my_string = String::from_utf8(my_vector).unwrap();

You can also use body::to_bytes as @euclio answered. Both approaches are straight-forward! Don't forget to handle unwrap properly.

Mismanage answered 6/11, 2020 at 14:2 Comment(1)
data is documented as "Returns future that resolves to next data chunk, if any.", so I'm pretty sure that this answer is incorrect / incomplete.Rodmun

© 2022 - 2025 — McMap. All rights reserved.