I have a long list of futures which I'd like to run using Stream::buffer_unordered
/Stream::buffered
. I combine this stream into a single future with for_each
and then execute it all with Tokio. It is quite common that one of the futures will return an error. According to the documentation, for_each
will stop when an error is returned.
How can I ignore or just print a message when these errors are returned and keep executing subsequent futures?
Here is general code similar to my situation:
use futures::stream;
use futures::stream::Stream;
use futures::future::err;
use futures::future::ok;
use tokio;
fn main() {
let queries: Vec<u32> = (0..10).collect();
let futures = queries.into_iter().map(move |num| {
println!("Started {}", num);
// Maybe throw error
let future = match num % 3 {
0 => ok::<u32, u32>(num),
_ => err::<u32, u32>(num)
};
future
});
let stream = stream::iter_ok(futures);
let num_workers = 8;
let future = stream
.buffer_unordered(num_workers)
.map_err(|err| {
println!("Error on {:?}", err);
})
.for_each(|n| {
println!("Success on {:?}", n);
Ok(())
});
tokio::runtime::run(future);
}
If you try this example, the queue of futures will stop executing early when an Err
is thrown.
result.map_err(|err| { println!("error occurred : {:?}", err.to_string()); Ok(()) });
This way you will consume the error and log it. – HugohugonOk(())
is anErr
---thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Ok(())',
– Gigot