The async example is useful, but being new to Rust and Tokio, I am struggling to work out how to do N requests at once, using URLs from a vector, and creating an iterator of the response HTML for each URL as a string.
How could this be done?
The async example is useful, but being new to Rust and Tokio, I am struggling to work out how to do N requests at once, using URLs from a vector, and creating an iterator of the response HTML for each URL as a string.
How could this be done?
As of reqwest 0.11.14:
use futures::{stream, StreamExt}; // 0.3.27
use reqwest::Client; // 0.11.14
use tokio; // 1.26.0, features = ["macros"]
const CONCURRENT_REQUESTS: usize = 2;
#[tokio::main]
async fn main() {
let client = Client::new();
let urls = vec!["https://api.ipify.org"; 2];
let bodies = stream::iter(urls)
.map(|url| {
let client = &client;
async move {
let resp = client.get(url).send().await?;
resp.bytes().await
}
})
.buffer_unordered(CONCURRENT_REQUESTS);
bodies
.for_each(|b| async {
match b {
Ok(b) => println!("Got {} bytes", b.len()),
Err(e) => eprintln!("Got an error: {}", e),
}
})
.await;
}
stream::iter(urls)
Take a collection of strings and convert it into a Stream
.
.map(|url| {
Run an asynchronous function on every element in the stream and transform the element to a new type.
let client = &client; async move {
Take an explicit reference to the Client
and move the reference (not the original Client
) into an anonymous asynchronous block.
let resp = client.get(url).send().await?;
Start an asynchronous GET request using the Client
's connection pool and wait for the request.
resp.bytes().await
Request and wait for the bytes of the response.
.buffer_unordered(N);
Convert a stream of futures into a stream of those future's values, executing the futures concurrently.
bodies .for_each(|b| { async { match b { Ok(b) => println!("Got {} bytes", b.len()), Err(e) => eprintln!("Got an error: {}", e), } } }) .await;
Convert the stream back into a single future, printing out the amount of data received along the way, then wait for the future to complete.
See also:
If you wanted to, you could also convert an iterator into an iterator of futures and use future::join_all
:
use futures::future; // 0.3.4
use reqwest::Client; // 0.10.1
use tokio; // 0.2.11
#[tokio::main]
async fn main() {
let client = Client::new();
let urls = vec!["https://api.ipify.org"; 2];
let bodies = future::join_all(urls.into_iter().map(|url| {
let client = &client;
async move {
let resp = client.get(url).send().await?;
resp.bytes().await
}
}))
.await;
for b in bodies {
match b {
Ok(b) => println!("Got {} bytes", b.len()),
Err(e) => eprintln!("Got an error: {}", e),
}
}
}
I'd encourage using the first example as you usually want to limit the concurrency, which buffer
and buffer_unordered
help with.
Concurrent requests are generally good enough, but there are times where you need parallel requests. In that case, you need to spawn a task.
use futures::{stream, StreamExt}; // 0.3.8
use reqwest::Client; // 0.10.9
use tokio; // 0.2.24, features = ["macros"]
const PARALLEL_REQUESTS: usize = 2;
#[tokio::main]
async fn main() {
let urls = vec!["https://api.ipify.org"; 2];
let client = Client::new();
let bodies = stream::iter(urls)
.map(|url| {
let client = client.clone();
tokio::spawn(async move {
let resp = client.get(url).send().await?;
resp.bytes().await
})
})
.buffer_unordered(PARALLEL_REQUESTS);
bodies
.for_each(|b| async {
match b {
Ok(Ok(b)) => println!("Got {} bytes", b.len()),
Ok(Err(e)) => eprintln!("Got a reqwest::Error: {}", e),
Err(e) => eprintln!("Got a tokio::JoinError: {}", e),
}
})
.await;
}
The primary differences are:
tokio::spawn
to perform work in separate tasks.reqwest::Client
. As recommended, we clone a shared client to make use of the connection pool.See also:
.map()
and when to use .and_then()
.. both for stream? and for future? –
Counterplot Stream::buffer_unordered
: If this stream's item can be converted into a future. See also What is the difference between then
, and_then
and or_else
in Rust futures? –
Earthworm .map()
part is missing though.. Also, how does that translate to stream
(not future
). In case of stream
, and_then
is invoked for each successful item in the stream. –
Counterplot stream of stream of futures
? I've HaspMap<K, Vec<Url>>
and I want to fetch each Url
.. and eventually want HashMap<K, Vec<SomeObject>>
–
Counterplot iter_ok(url_map).map(|k, urls| (k, iter_ok(urls).map(..))).buffered_unordered(N).... <lost here>
–
Counterplot StreamExt
has the buffer
and buffer_unordered
methods, which you'll almost always want to use to prevent completely unbounded requests from taking up all of your system resources (e.g. open sockets). You don't need it though –
Earthworm clone
instead of new
ing up for improved copy-ability? –
Shue tokio::spawn
creates new tasks, and tasks can be moved across threads (when Tokio is using a multithreaded scheduler) –
Earthworm bodies.collect::<Vec<_>>().await
. –
Roble let client = &client;
if you start out with let client = &Client::new();
. The move
on the async bloc is necessary for url
, not for client
. –
Strongarm If possible for your problem I recommend using std async and rayon. They are both mature now and really easy to get started with given the async{/* code here */} scope bounds in std. You can also work into/alongside tokio with feature integration https://docs.rs/async-std/1.10.0/async_std/#features
© 2022 - 2025 — McMap. All rights reserved.
.map()
returns a future? We should be usingand_then
? – Counterplot