How to tokio::join multiple tasks?
Asked Answered
S

3

31

Imagine that some futures are stored in a Vec whose length are runtime-determined, you are supposed to join these futures concurrently, what should you do?

Obviously, by the example in the document of tokio::join, manually specifying each length the Vec could be, like 1, 2, 3, ... and dealing with respectable case should work.

extern crate tokio;

let v = Vec::new();
v.push(future_1);

// directly or indirectly you push many futures to the vector
 
v.push(future_N);

// to join these futures concurrently one possible way is 

if v.len() == 0 {}
if v.len() == 1 { join!(v.pop()); }
if v.len() == 2 { join!(v.pop(), v.pop() ); }
// ...

And I also noticed that tokio::join! take a list as parameter in the document, when I use syntax like

tokio::join!(v);

or something like

tokio::join![ v ] /  tokio::join![ v[..] ] / tokio::join![ v[..][..] ]

it just doesn't work

And here comes the question that is there any doorway to join these futures more efficient or should I miss something against what the document says?

Sousa answered 26/8, 2020 at 2:34 Comment(4)
tokio::join! takes a variable number of async expressions, not a single slice / vec of async values.Introductory
misinterpreting the document of tokio I made in trying to solve this myself. I assume that a block of async code(async function or async expression or whatever) will be treated as futures in rust. And my problem is how join these length-varied ' async expression' concurrently in a more clear way.Sousa
Duplicate of How to wait for a list of async function calls in rust?Piscatory
I believe the answer of JoinSet should be the direct and spot-on answer for your question. Hope one day you can come back and give the man his kudos. I was looking for something like this as well, having known there's join_all from futures; but at times one would want one dependency only.Untidy
F
28

You can use futures::future::join_all to "merge" your collection of futures together into a single future, that resolves when all of the subfutures resolve.

Fluoroscope answered 26/8, 2020 at 4:10 Comment(2)
thanks. Solved my problem. I didn't find the symmetrical one "join_any(...)", though. Is it somewhere else? Or absent... ?Krusche
found docs.rs/tokio/latest/tokio/macro.select.html but it cancels the other branches when the first completes...Krusche
F
40

join_all and try_join_all, as well as more versatile FuturesOrdered and FuturesUnordered utilities from the same crate futures, are polled as a single combined future. This is probably fine if the constituent futures are simple and not often concurrently ready to perform work, but there are two potential problems with combining in futures this way. First, you won't be able to make use of CPU parallelism with the multi-threaded runtime. Second, this carries a possibility of deadlocks if the futures use shared synchronization primitives. If these concerns do arise, consider spawning the individual futures as separate tasks and waiting on the tasks to finish.

Tokio 1.21.0 or later: JoinSet

With recent Tokio releases, you can use JoinSet to get the maximum flexibility, including the ability to abort all tasks. The tasks in the set are also aborted when JoinSet is dropped.

use tokio::task::JoinSet;

let mut set = JoinSet::new();

for fut in v {
    set.spawn(fut);
}

while let Some(res) = set.join_next().await {
    let out = res?;
    // ...
}

Older API

Spawn tasks with tokio::spawn and wait on the join handles:

use futures::future;

// ...

let outputs = future::try_join_all(v.into_iter().map(tokio::spawn)).await?;

You can also use the FuturesOrdered and FuturesUnordered combinators to process the outputs asynchronously in a stream:

use futures::stream::FuturesUnordered;
use futures::prelude::*;

// ...

let mut completion_stream = v.into_iter()
    .map(tokio::spawn)
    .collect::<FuturesUnordered<_>>();
while let Some(res) = completion_stream.next().await {
    // ...    
}

One caveat with waiting for tasks this way is that the tasks are not cancelled when the future (e.g. an async block) that has spawned the task and possibly owns the returned JoinHandle gets dropped. The JoinHandle::abort method needs to be used to explicitly cancel the task.

Format answered 3/10, 2021 at 11:51 Comment(0)
F
28

You can use futures::future::join_all to "merge" your collection of futures together into a single future, that resolves when all of the subfutures resolve.

Fluoroscope answered 26/8, 2020 at 4:10 Comment(2)
thanks. Solved my problem. I didn't find the symmetrical one "join_any(...)", though. Is it somewhere else? Or absent... ?Krusche
found docs.rs/tokio/latest/tokio/macro.select.html but it cancels the other branches when the first completes...Krusche
O
9

A full example:

#[tokio::main]
async fn main() {
    let tasks = (0..5).map(|i| tokio::spawn(async move {
        sleep(Duration::from_secs(1)).await; // simulate some work
        i * 2
    })).collect::<FuturesUnordered<_>>();

    let result = futures::future::join_all(tasks).await;
    println!("{:?}", result); // [Ok(8), Ok(6), Ok(4), Ok(2), Ok(0)]
}

Playground

Octamerous answered 12/8, 2022 at 15:13 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.