How can I perform parallel asynchronous HTTP GET requests with reqwest?
Asked Answered
J

2

62

The async example is useful, but being new to Rust and Tokio, I am struggling to work out how to do N requests at once, using URLs from a vector, and creating an iterator of the response HTML for each URL as a string.

How could this be done?

Janessa answered 26/6, 2018 at 13:46 Comment(0)
E
171

Concurrent requests

As of reqwest 0.11.14:

use futures::{stream, StreamExt}; // 0.3.27
use reqwest::Client; // 0.11.14
use tokio; // 1.26.0, features = ["macros"]

const CONCURRENT_REQUESTS: usize = 2;

#[tokio::main]
async fn main() {
    let client = Client::new();

    let urls = vec!["https://api.ipify.org"; 2];

    let bodies = stream::iter(urls)
        .map(|url| {
            let client = &client;
            async move {
                let resp = client.get(url).send().await?;
                resp.bytes().await
            }
        })
        .buffer_unordered(CONCURRENT_REQUESTS);

    bodies
        .for_each(|b| async {
            match b {
                Ok(b) => println!("Got {} bytes", b.len()),
                Err(e) => eprintln!("Got an error: {}", e),
            }
        })
        .await;
}

stream::iter(urls)

stream::iter

Take a collection of strings and convert it into a Stream.

.map(|url| {

StreamExt::map

Run an asynchronous function on every element in the stream and transform the element to a new type.

let client = &client;
async move {

Take an explicit reference to the Client and move the reference (not the original Client) into an anonymous asynchronous block.

let resp = client.get(url).send().await?;

Start an asynchronous GET request using the Client's connection pool and wait for the request.

resp.bytes().await

Request and wait for the bytes of the response.

.buffer_unordered(N);

StreamExt::buffer_unordered

Convert a stream of futures into a stream of those future's values, executing the futures concurrently.

bodies
    .for_each(|b| {
        async {
            match b {
                Ok(b) => println!("Got {} bytes", b.len()),
                Err(e) => eprintln!("Got an error: {}", e),
            }
        }
    })
    .await;

StreamExt::for_each

Convert the stream back into a single future, printing out the amount of data received along the way, then wait for the future to complete.

See also:

Without bounded execution

If you wanted to, you could also convert an iterator into an iterator of futures and use future::join_all:

use futures::future; // 0.3.4
use reqwest::Client; // 0.10.1
use tokio; // 0.2.11

#[tokio::main]
async fn main() {
    let client = Client::new();

    let urls = vec!["https://api.ipify.org"; 2];

    let bodies = future::join_all(urls.into_iter().map(|url| {
        let client = &client;
        async move {
            let resp = client.get(url).send().await?;
            resp.bytes().await
        }
    }))
    .await;

    for b in bodies {
        match b {
            Ok(b) => println!("Got {} bytes", b.len()),
            Err(e) => eprintln!("Got an error: {}", e),
        }
    }
}

I'd encourage using the first example as you usually want to limit the concurrency, which buffer and buffer_unordered help with.

Parallel requests

Concurrent requests are generally good enough, but there are times where you need parallel requests. In that case, you need to spawn a task.

use futures::{stream, StreamExt}; // 0.3.8
use reqwest::Client; // 0.10.9
use tokio; // 0.2.24, features = ["macros"]

const PARALLEL_REQUESTS: usize = 2;

#[tokio::main]
async fn main() {
    let urls = vec!["https://api.ipify.org"; 2];

    let client = Client::new();

    let bodies = stream::iter(urls)
        .map(|url| {
            let client = client.clone();
            tokio::spawn(async move {
                let resp = client.get(url).send().await?;
                resp.bytes().await
            })
        })
        .buffer_unordered(PARALLEL_REQUESTS);

    bodies
        .for_each(|b| async {
            match b {
                Ok(Ok(b)) => println!("Got {} bytes", b.len()),
                Ok(Err(e)) => eprintln!("Got a reqwest::Error: {}", e),
                Err(e) => eprintln!("Got a tokio::JoinError: {}", e),
            }
        })
        .await;
}

The primary differences are:

  • We use tokio::spawn to perform work in separate tasks.
  • We have to give each task its own reqwest::Client. As recommended, we clone a shared client to make use of the connection pool.
  • There's an additional error case when the task cannot be joined.

See also:

Earthworm answered 26/6, 2018 at 16:41 Comment(21)
Why did you use ` stream::iter_ok(urls).map(..)` ? The closure passed to .map() returns a future? We should be using and_then?Counterplot
Or alternatively, when to use .map() and when to use .and_then() .. both for stream? and for future?Counterplot
@Nawaz See the documentation for Stream::buffer_unordered: If this stream's item can be converted into a future. See also What is the difference between then, and_then and or_else in Rust futures?Earthworm
Thanks. I read that. I understand that. The .map() part is missing though.. Also, how does that translate to stream (not future). In case of stream, and_then is invoked for each successful item in the stream.Counterplot
Convert a stream of futures into a stream of those future's values, executing the futures in parallel...What if I want stream of stream of futures? I've HaspMap<K, Vec<Url>> and I want to fetch each Url.. and eventually want HashMap<K, Vec<SomeObject>>Counterplot
I'm trying something like this: iter_ok(url_map).map(|k, urls| (k, iter_ok(urls).map(..))).buffered_unordered(N).... <lost here>Counterplot
@Nawaz please produce a minimal reproducible example then post a new question, linking to this one and describing why it's different. Try to reproduce your error on the Rust Playground if possible, otherwise in a brand new Cargo project, then include all of that information to reproduce it in your question, such as versions. There are Rust-specific MRE tips you can use to reduce your original code for posting here.Earthworm
Is there a reason a stream is used rather than joining a list of futures?Cateyed
@Cateyed StreamExt has the buffer and buffer_unordered methods, which you'll almost always want to use to prevent completely unbounded requests from taking up all of your system resources (e.g. open sockets). You don't need it thoughEarthworm
I think this example won't run parallel but only concurrent, based on this gist which I found in this post.Overkill
@AlexMoore-Niemi fair enough. Updated.Earthworm
Question - can we clone a single client instead of giving each task its own client?Shue
@MarkLodato I not sure I follow. When you clone a client you make another client, which you then give to each task as it's own. The only difference is how the unique client is created. So I think the answer is "yes".Earthworm
@Earthworm I'm probably assuming too much but it looks like Client has an inner Arc. Would that imply that cloning it would clone the Arc and allow you to reuse the connection pool (or some wizardry) from the underlying ClientRef?Shue
@MarkLodato yes, that’s what the documentation recommends: The Client holds a connection pool internally, so it is advised that you create one and reuse it. You do not have to wrap the Client it in an Rc or Arc to reuse it, because it already uses an Arc internally.Earthworm
@Earthworm Makes sense! Is there any value in updating this answer to using clone instead of newing up for improved copy-ability?Shue
@MarkLodato ah, now I see why you were asking! Yes, I've addressed that.Earthworm
@Earthworm which of these approaches uses the M:N threading I keep reading so much about? i.e. where the tokio runtime will create concurrent threads but also create parallel threads where necessaryBecker
@Becker the one under Parallel requests. tokio::spawn creates new tasks, and tasks can be moved across threads (when Tokio is using a multithreaded scheduler)Earthworm
In case if you need to convert the whole thing into a vector, use bodies.collect::<Vec<_>>().await.Roble
You can save yourself the let client = &client; if you start out with let client = &Client::new();. The move on the async bloc is necessary for url, not for client.Strongarm
S
-2

If possible for your problem I recommend using std async and rayon. They are both mature now and really easy to get started with given the async{/* code here */} scope bounds in std. You can also work into/alongside tokio with feature integration https://docs.rs/async-std/1.10.0/async_std/#features

Saltsman answered 25/10, 2021 at 23:2 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.