How can I remove or otherwise ignore errors when processing a stream?
Asked Answered
G

1

5

I have a long list of futures which I'd like to run using Stream::buffer_unordered/Stream::buffered. I combine this stream into a single future with for_each and then execute it all with Tokio. It is quite common that one of the futures will return an error. According to the documentation, for_each will stop when an error is returned.

How can I ignore or just print a message when these errors are returned and keep executing subsequent futures?

Here is general code similar to my situation:

use futures::stream;
use futures::stream::Stream;
use futures::future::err;
use futures::future::ok;
use tokio;

fn main() {
    let queries: Vec<u32> = (0..10).collect();
    let futures = queries.into_iter().map(move |num| {
        println!("Started {}", num);
        // Maybe throw error
        let future = match num % 3 {
            0 => ok::<u32, u32>(num),
            _ => err::<u32, u32>(num)
        };
        future
    });

    let stream = stream::iter_ok(futures);
    let num_workers = 8;
    let future = stream
        .buffer_unordered(num_workers)
        .map_err(|err| {
            println!("Error on {:?}", err);
        })
        .for_each(|n| {
            println!("Success on {:?}", n);
            Ok(())
        });

    tokio::runtime::run(future);
}

Rust Playground

If you try this example, the queue of futures will stop executing early when an Err is thrown.

Gigot answered 8/2, 2019 at 14:15 Comment(2)
You can use result.map_err(|err| { println!("error occurred : {:?}", err.to_string()); Ok(()) }); This way you will consume the error and log it.Hugohugon
@AkinerAlkan This doesn't seem to work, as it still thinks Ok(()) is an Err --- thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Ok(())',Gigot
B
7
  • Stream::map_err — provided with error values, it can convert the type but it leaves it as an error.

  • Stream::or_else — provided with error values, it can convert the error into a success, leaving success values unchanged.

  • Stream::then — provided with both success and error values and can do whatever you want.

Stream::map does not give you the ability to convert errors into success, so it's not useful. Stream::or_else does give the ability, but it's used when you can convert the error type into the success type. Only Stream::then gives you the ability to convert both types at once.

Stream::flatten can be used to convert a stream of streams into a single stream.

Combine this with the fact that Result can be treated as an iterator, and you can create this:

stream
    .then(|r| future::ok(stream::iter_ok::<_, ()>(r)))
    .flatten()

Regardless if the stream's item is Ok or Err, we convert it to an iterator and create a stream from it. We then flatten out the stream of streams.

If you wanted to print out errors, I'd use Stream::inspect_err:

stream.inspect_err(|err| println!("Error on {:?}", err))

Complete code:

use futures::{
    future,
    stream::{self, Stream},
}; // 0.1.25;
use tokio; // 0.1.14

fn main() {
    let stream = stream::iter_ok({
        (0..10).map(|num| {
            println!("Started {}", num);
            match num % 3 {
                0 => future::ok(num),
                _ => future::err(num),
            }
        })
    })
    .buffer_unordered(2);

    let stream = stream
        .inspect_err(|err| println!("Error on {:?}", err))
        .then(|r| future::ok(stream::iter_ok::<_, ()>(r)))
        .flatten();

    tokio::run({
        stream.for_each(|n| {
            println!("Success on {:?}", n);
            Ok(())
        })
    });
}
Started 0
Started 1
Success on 0
Started 2
Error on 1
Started 3
Error on 2
Started 4
Success on 3
Started 5
Error on 4
Started 6
Error on 5
Started 7
Success on 6
Started 8
Error on 7
Started 9
Error on 8
Success on 9
Bettyannbettye answered 8/2, 2019 at 17:2 Comment(2)
Great answer that really explained the thought process of considering all available functions.Gigot
stream::iter_ok() was removed it seems. You can use something like stream::iter(res.into_iter()) instead.Salim

© 2022 - 2024 — McMap. All rights reserved.