How do I iterate over a Vec of functions returning Futures in Rust?
Asked Answered
D

1

4

Is it possible to loop over a Vec, calling a method that returns a Future on each, and build a chain of Futures, to be evaluated (eventually) by the consumer? Whether to execute the later Futures would depend on the outcome of the earlier Futures in the Vec.

To clarify:

I'm working on an application that can fetch data from an arbitrary set of upstream sources.

Requesting data would check with each of the sources, in turn. If the first source had an error (Err), or did not have the data available (None), then the second source would be tried, and so on.

Each source should be tried exactly once, and no source should be tried until all of the sources before have returned their results. Errors are logged, but otherwise ignored, passing the query to the next upstream data source.

I have some working code that does this for fetching metadata:

/// Attempts to read/write data to various external sources. These are
/// nested types, because a data source may exist as both a reader and a writer
struct StoreManager {
    /// Upstream data sources
    readers: Vec<Rc<RefCell<StoreRead>>>,
    /// Downstream data sinks
    writers: Vec<Rc<RefCell<StoreWrite>>>,
}

impl StoreRead for StoreManager {
    fn metadata(self: &Self, id: &Identifier) -> Box<Future<Option<Metadata>, Error>> {
       Box::new(ok(self.readers
            .iter()
            .map(|store| {
                executor::block_on(store.borrow().metadata(id)).unwrap_or_else(|err| {
                    error!("Error on metadata(): {:?}", err);
                    None
                })
            })
            .find(Option::is_some)
            .unwrap_or(None)))
    }
}

Aside from my unhappiness with all of the Box and Rc/RefCell nonsense, my real concern is with the executor::block_on() call. It blocks, waiting for each Future to return a result, before continuing to the next.

Given that it's possible to call fn_returning_future().or_else(|_| other_fn()) and so on, is it possible to build up a dynamic chain like this? Or is it a requirement to fully evaluate each Future in the iterator before moving to the next?

Divan answered 14/6, 2018 at 5:38 Comment(2)
You may want to use a Stream instead. Quote: "If Future is an asynchronous version of Result, then Stream is an asynchronous version of Iterator." and you want to iterate :)Tablet
Please review how to create a minimal reproducible example and then edit your question to include it. We cannot tell what crates, types, traits, fields, etc. are present in the code. Ideally, produce something that reproduces your error on the Rust Playground. I believe that you could remove all code specific to your application and have the pure concept of "loop over a Vec, calling a method that returns a Future on each, and build a chain of Futures, to be evaluated (eventually) by the consumer?".Spacing
S
6

You can use stream::unfold to convert a single value into a stream. In this case, we can use the IntoIter iterator as that single value.

use futures::{executor, stream, Stream, TryStreamExt}; // 0.3.4

type Error = Box<dyn std::error::Error>;
type Result<T, E = Error> = std::result::Result<T, E>;

async fn network_request(val: i32) -> Result<i32> {
    // Just for demonstration, don't do this in a real program
    use std::{
        thread,
        time::{Duration, Instant},
    };
    thread::sleep(Duration::from_secs(1));
    println!("Resolving {} at {:?}", val, Instant::now());

    Ok(val * 100)
}

fn requests_in_sequence(vals: Vec<i32>) -> impl Stream<Item = Result<i32>> {
    stream::unfold(vals.into_iter(), |mut vals| async {
        let val = vals.next()?;
        let response = network_request(val).await;
        Some((response, vals))
    })
}

fn main() {
    let s = requests_in_sequence(vec![1, 2, 3]);
    executor::block_on(async {
        s.try_for_each(|v| async move {
            println!("-> {}", v);
            Ok(())
        })
        .await
        .expect("An error occurred");
    });
}
Resolving 1 at Instant { tv_sec: 6223328, tv_nsec: 294631597 }
-> 100
Resolving 2 at Instant { tv_sec: 6223329, tv_nsec: 310839993 }
-> 200
Resolving 3 at Instant { tv_sec: 6223330, tv_nsec: 311005834 }
-> 300

To ignore Err and None, you have to shuttle the Error over to the Item, making the Item type a Result<Option<T>, Error>:

use futures::{executor, stream, Stream, StreamExt}; // 0.3.4

type Error = Box<dyn std::error::Error>;
type Result<T, E = Error> = std::result::Result<T, E>;

async fn network_request(val: i32) -> Result<Option<i32>> {
    // Just for demonstration, don't do this in a real program
    use std::{
        thread,
        time::{Duration, Instant},
    };
    thread::sleep(Duration::from_secs(1));
    println!("Resolving {} at {:?}", val, Instant::now());

    match val {
        1 => Err("boom".into()),  // An error
        2 => Ok(None),            // No data
        _ => Ok(Some(val * 100)), // Success
    }
}

fn requests_in_sequence(vals: Vec<i32>) -> impl Stream<Item = Result<Option<i32>>> {
    stream::unfold(vals.into_iter(), |mut vals| async {
        let val = vals.next()?;
        let response = network_request(val).await;
        Some((response, vals))
    })
}

fn main() {
    executor::block_on(async {
        let s = requests_in_sequence(vec![1, 2, 3]);

        let s = s.filter_map(|v| async move { v.ok() });
        let s = s.filter_map(|v| async move { v });
        let mut s = s.boxed_local();

        match s.next().await {
            Some(v) => println!("First success: {}", v),
            None => println!("No successful requests"),
        }
    });
}
Resolving 1 at Instant { tv_sec: 6224229, tv_nsec: 727216392 }
Resolving 2 at Instant { tv_sec: 6224230, tv_nsec: 727404752 }
Resolving 3 at Instant { tv_sec: 6224231, tv_nsec: 727593740 }
First success: 300

is it possible to build up a dynamic chain like this

Yes, by leveraging async functions:

use futures::executor; // 0.3.4

type Error = Box<dyn std::error::Error>;
type Result<T, E = Error> = std::result::Result<T, E>;

async fn network_request(val: i32) -> Result<Option<i32>> {
    // Just for demonstration, don't do this in a real program
    use std::{
        thread,
        time::{Duration, Instant},
    };
    thread::sleep(Duration::from_secs(1));
    println!("Resolving {} at {:?}", val, Instant::now());

    match val {
        1 => Err("boom".into()),  // An error
        2 => Ok(None),            // No data
        _ => Ok(Some(val * 100)), // Success
    }
}

async fn requests_in_sequence(vals: Vec<i32>) -> Result<i32> {
    let mut vals = vals.into_iter().peekable();

    while let Some(v) = vals.next() {
        match network_request(v).await {
            Ok(Some(v)) => return Ok(v),
            Err(e) if vals.peek().is_none() => return Err(e),
            Ok(None) | Err(_) => { /* Do nothing and try the next source */ }
        }
    }

    Err("Ran out of sources".into())
}

fn main() {
    executor::block_on(async {
        match requests_in_sequence(vec![1, 2, 3]).await {
            Ok(v) => println!("First success: {}", v),
            Err(e) => println!("No successful requests: {}", e),
        }
    });
}

See also:


is it a requirement to fully evaluate each Future in the iterator before moving to the next

Isn't that part of your own requirements? Emphasis mine:

Requesting data would check with each of the sources, in turn. If the first source had an error (Err), or did not have the data available (None), then the second source would be tried

Spacing answered 14/6, 2018 at 14:15 Comment(2)
Re: is it a requirement to fully evaluate each Future in the iterator before moving to the next, yes, I realize it must be resolved, but I'd prefer the execution be managed by the caller, not inside my function.Divan
How to collect values returned from these requests into a vector?Kyd

© 2022 - 2024 — McMap. All rights reserved.