Executing a collection of futures sequentially
Asked Answered
D

6

10

I have a collection of futures which I want to combine into a single future that gets them executed sequentially.

I looked into the futures_ordered function. It seems to return the results sequentially but the futures get executed concurrently.

I tried to fold the futures, combining them with and_then. However, that is tricky with the type system.

let tasks = vec![ok(()), ok(()), ok(())];

let combined_task = tasks.into_iter().fold(
    ok(()),                             // seed
    |acc, task| acc.and_then(|_| task), // accumulator
);

playground

This gives the following error:

error[E0308]: mismatched types
  --> src/main.rs:10:21
   |
10 |         |acc, task| acc.and_then(|_| task), // accumulator
   |                     ^^^^^^^^^^^^^^^^^^^^^^ expected struct `futures::FutureResult`, found struct `futures::AndThen`
   |
   = note: expected type `futures::FutureResult<_, _>`
              found type `futures::AndThen<futures::FutureResult<_, _>, futures::FutureResult<(), _>, [closure@src/main.rs:10:34: 10:42 task:_]>`

I'm probably approaching this wrong but I've run out of ideas.

Dorree answered 3/1, 2018 at 16:53 Comment(2)
Posting as a comment rather than an answer as I don't know how to fix it off the top of my head: since you provide ok(()) - which returns a FutureResult - as the initial value, fold expects you to return a FutureResult from each iteration of the closure. In other words, the inferred types are too specific.Drowse
a single future that gets them executed sequentially — why would you want to do such a thing? Since your futures don't depend on each other, there's no obvious reason to introduce forced serialization.Greenhouse
H
7

Combine iter_ok and Stream::for_each:

use futures::Stream;
use futures::future::ok;
use futures::stream::iter_ok;

let tasks = vec![ok(()), ok(()), ok(())];

let combined_task = iter_ok::<_, ()>(tasks).for_each(|f| f);

iter_ok produces a stream of the passed items, and never throws an error (that is why you sometimes need to fix the error type). The closure passed to for_each then returns a Future to be run for each item - here simply the items that were passed in.

for_each then drives each returned future to completion before moving to the next one, like you wanted. It will also abort with the first error it encounters, and requires the inner futures to return () on success.

for_each itself returns a Future that will either fail (like described above) or return () on completion.

test tests::bench_variant_buffered ... bench:      22,356 ns/iter (+/- 1,816)
test tests::bench_variant_boxed ...    bench:       8,575 ns/iter (+/- 1,042)
test tests::bench_variant_for_each ... bench:       4,070 ns/iter (+/- 531)
Hades answered 4/1, 2018 at 20:15 Comment(2)
This should be accepted instead of my answer, I completely missed that the asker actually wanted a Future at the end. I wrote my answer based upon the OP wanting a Stream.Disc
Accepted @Stefan's answer for the most optimal solution in my scenario. Also big thanks to Lukazoid and Shepmaster - I learned a lot of about the futures crate.Dorree
D
6

Stream has a function buffered which allows you to limit how many futures are polled concurrently.

If you have a collection of futures, you can create a stream and use buffered like so:

let tasks = vec![future1, future2];
let stream = ::futures::stream::iter_ok(tasks);
let mut when_result_ready = stream.buffered(1);

when_result_ready will now be a Stream implementation which only polls one future at a time and moves to the next once each future completes.

Update

Based on comments and profiling it appears buffered has a large overhead so another solution is to convert each Future to a Stream and flatten them:

iter_ok(tasks).map(|f|f.into_stream()).flatten()

flatten states that "each individual stream will get exhausted before moving on to the next." meaning no Future will be polled before the previous one is completed. In my local profiling this seems to be ~80% faster than the buffered approach.


Both of my answers above result in a Stream of results where each source Future is polled sequentially and the results returned. What the asker actually asked for was just a single Future at the end and not the results of each source Future, if this is the case, the answer from Stefan may be more useful and prove to have better performance.

Disc answered 3/1, 2018 at 17:42 Comment(4)
Thanks! I especially like the granularity of being able to specify level of concurrency through buffer size.Dorree
I think buffered has a high overhead (there should be some Arc objects in the middle passed to with_notify every time an inner future is polled). Boxing might be cheaper than buffered(1).Hades
@Hades I did a quick bench and the boxed variant is roughly 30% faster than the buffer solution with a lot low lower std deviation. Although I'm not confident that any of the crucial parts weren't stripped away by optimizations. play.rust-lang.org/…Dorree
Actually, and_then(|f| f) (Stream::and_then) should be "the same" as buffered(1), just a lot faster.Hades
G
2

As mentioned in the comments, your types are too concrete.

You can envision the implementation of fold as doing something like this:

let (task0, task1, task2) = (ok(()), ok(()), ok(()));

let mut combined_task = ok(()); // seed
combined_task = combined_task.and_then(|_| task0); 
combined_task = combined_task.and_then(|_| task1); 
combined_task = combined_task.and_then(|_| task2); 

The variable combined_task needs to be updated in place with a new value of the same type. Since we start with ok(()), that's the type each step needs to return. However, the return type of and_then is different; it's an AndThen. In fact, AndThen is a generic type containing the closure and the underlying future, so each step will produce a distinct type with potentially a different size:

  1. FutureResult<()>
  2. AndThen<FutureResult<()>, closure0>
  3. AndThen<AndThen<FutureResult<()>, closure0>, closure1>
  4. AndThen<AndThen<AndThen<FutureResult<()>, closure0>, closure1>, closure2>

Instead, you can create a unified type by producing a boxed trait object at each step:

let (task0, task1, task2) = (ok(()), ok(()), ok(()));

let mut combined_task: Box<Future<Item = (), Error = ()>> = Box::new(ok(())); // seed
combined_task = Box::new(combined_task.and_then(|_| task0)); 
combined_task = Box::new(combined_task.and_then(|_| task1)); 
combined_task = Box::new(combined_task.and_then(|_| task2)); 
  1. Box<Future<Item = (), Error = ()>>
  2. Box<Future<Item = (), Error = ()>>
  3. Box<Future<Item = (), Error = ()>>
  4. Box<Future<Item = (), Error = ()>>

Converting back to the fold syntax:

let combined_task: Box<Future<Item = (), Error = ()>> =
    tasks.into_iter().fold(Box::new(ok(())), |acc, task| {
        Box::new(acc.and_then(|_| task))
    });

See also:

Greenhouse answered 3/1, 2018 at 18:7 Comment(5)
Thanks! I was thinking of this but prefer to avoid heap allocations / runtime dispatch if possible. Although I'm not sure how the other buffered stream approach is internally implemented.Dorree
Hi, @Greenhouse , can you please explain more about why do add Box can fix this issue? I occurred same problem, if I specify function returns impl Future<Item=String, Error=Error>, program compiles failed with expect futures::FutureResult, found futures::AndThen, but when I changed return type to Box<Future<Item=String, Error=Error>>, everything can be compiled.Stipe
@Stipe I have a much longer explanation in another answer (now linked from this answer). Can you check that and see if it explains it adequately? If not, let me know what's missing and I'll try to update.Greenhouse
Thank you @Greenhouse , I read your post several times but can't understand completely. The Box, my understanding is, everything which is unsized, needs Box wrapper. In my case, function returns impl Future trait that was unsized, so Rust refuses to compile, right? I still feeling I was misunderstanding something....Stipe
@Stipe not quite; unsized types don't really play into it here. The problem is that each step has a different type (which can have different sizes). impl Future is not unsized. The compiler knows the concrete type and size, although the programmer does not.Greenhouse
G
2

In my case (stable async/await) this code was very helpful:

use futures::{stream, StreamExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + 'static>> {
    let data = vec![1,2,3];

    stream::iter(data).for_each(|id| async move {
        let request = async { id }; // async io request
        let res = request.await;
        println!("res: {:?}", res);
        ()
    }).await;
    
    Ok(())
}

https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=ad5feaf0cbb3597730c22df2eaf4a606

Geotropism answered 17/7, 2020 at 10:45 Comment(0)
C
1

When I needed something like this (mainly because I was debugging an issue) I ended up writing a seq combinator composing loop_fn like so:

fn seq<I>(
    i: I,
) -> impl Future<Item = Vec<<I::Item as IntoFuture>::Item>, Error = <I::Item as IntoFuture>::Error>
where
    I: IntoIterator,
    I::Item: IntoFuture,
{
    let iter = i.into_iter();
    loop_fn((vec![], iter), |(mut output, mut iter)| {
        let fut = if let Some(next) = iter.next() {
            Either::A(next.into_future().map(|v| Some(v)))
        } else {
            Either::B(future::ok(None))
        };

        fut.and_then(move |val| {
            if let Some(val) = val {
                output.push(val);
                Ok(Loop::Continue((output, iter)))
            } else {
                Ok(Loop::Break(output))
            }
        })
    })
}
Carl answered 7/8, 2018 at 0:27 Comment(0)
O
0

Just iterate a vector using await inside for

use std::future;

#[tokio::main]
async fn main() {

    let futures = vec![future::ready(1), future::ready(2), future::ready(3)];

    let mut result = vec![];

    for future in futures {
        result.push(future.await);
    }

    dbg!(result);
    // output
    //[src/bin/future.rs:14] result = [
    // 1,
    // 2,
    // 3,
    //]

}
Olivenite answered 18/9, 2023 at 9:50 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.