Spawn non-static future with Tokio
Asked Answered
H

3

10

I have an async method that should execute some futures in parallel, and only return after all futures finished. However, it is passed some data by reference that does not live as long as 'static (it will be dropped at some point in the main method). Conceptually, it's similar to this (Playground):

async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
}

async fn parallel_stuff(array: &[u64]) {
    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in array {
        let task = spawn(do_sth(i));
        tasks.push(task);
    }
    for task in tasks {
        task.await;
    }
}

#[tokio::main]
async fn main() {
    parallel_stuff(&[3, 1, 4, 2]);
}

Now, tokio wants futures that are passed to spawn to be valid for the 'static lifetime, because I could drop the handle without the future stopping. That means that my example above produces this error message:

error[E0759]: `array` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
  --> src/main.rs:12:25
   |
12 | async fn parallel_stuff(array: &[u64]) {
   |                         ^^^^^  ------ this data with an anonymous lifetime `'_`...
   |                         |
   |                         ...is captured here...
...
15 |         let task = spawn(do_sth(i));
   |                    ----- ...and is required to live as long as `'static` here

So my question is: How do I spawn futures that are only valid for the current context that I can then wait until all of them completed?

Heathenry answered 12/12, 2020 at 20:56 Comment(8)
Does this answer your question? Spawning tasks with non-static lifetimes in tokioArcature
Why not just copy the u64s? You don't have to use references at all here.Odilia
Related: async_scopedArcature
Also, things passed to spawn do not need to have a 'static lifetime they only need to be bounded by a 'static lifetime. This is a common Rust lifetime misconception.Odilia
@IbraheemAhmed no it does not, because that question spawns tasks without waiting for them at the end of the methodHeathenry
@Odilia Because I'm actually passing down non-copyable references, this is just a broken down example. If I had to use copy-able types, I could use Arc<RwLock<NotActuallyU64>>, but maybe there's a better solution that doesn't involve lockingHeathenry
Also, my type is very well static (and therefore valid for the 'static lifetime), but the reference does not live as long as 'static, so I'm not sure if that link applies. Anyways, the data that is being referenced is destroyed in the main method at some point, like I said in the questionHeathenry
Since in this case the answer is the same for Tokio 0.2 and 0.3, it would make sense to modify your question to apply to both versions of Tokio.Argumentative
A
16

It is not possible to spawn a non-'static future from async Rust. This is because any async function might be cancelled at any time, so there is no way to guarantee that the caller really outlives the spawned tasks.

It is true that there are various crates that allow scoped spawns of async tasks, but these crates cannot be used from async code. What they do allow is to spawn scoped async tasks from non-async code. This doesn't violate the problem above, because the non-async code that spawned them cannot be cancelled at any time, as it is not async.

Generally there are two approaches to this:

  1. Spawn a 'static task by using Arc rather than ordinary references.
  2. Use the concurrency primitives from the futures crate instead of spawning.

Generally to spawn a static task and use Arc, you must have ownership of the values in question. This means that since your function took the argument by reference, you cannot use this technique without cloning the data.

async fn do_sth(with: Arc<[u64]>, idx: usize) {
    delay_for(Duration::new(with[idx], 0)).await;
    println!("{}", with[idx]);
}

async fn parallel_stuff(array: &[u64]) {
    // Make a clone of the data so we can shared it across tasks.
    let shared: Arc<[u64]> = Arc::from(array);
    
    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in 0..array.len() {
        // Cloning an Arc does not clone the data.
        let shared_clone = shared.clone();
        let task = spawn(do_sth(shared_clone, i));
        tasks.push(task);
    }
    for task in tasks {
        task.await;
    }
}

Note that if you have a mutable reference to the data, and the data is Sized, i.e. not a slice, it is possible to temporarily take ownership of it.

async fn do_sth(with: Arc<Vec<u64>>, idx: usize) {
    delay_for(Duration::new(with[idx], 0)).await;
    println!("{}", with[idx]);
}

async fn parallel_stuff(array: &mut Vec<u64>) {
    // Swap the array with an empty one to temporarily take ownership.
    let vec = std::mem::take(array);
    let shared = Arc::new(vec);
    
    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in 0..array.len() {
        // Cloning an Arc does not clone the data.
        let shared_clone = shared.clone();
        let task = spawn(do_sth(shared_clone, i));
        tasks.push(task);
    }
    for task in tasks {
        task.await;
    }
    
    // Put back the vector where we took it from.
    // This works because there is only one Arc left.
    *array = Arc::try_unwrap(shared).unwrap();
}

Another option is to use the concurrency primitives from the futures crate. These have the advantage of working with non-'static data, but the disadvantage that the tasks will not be able to run on multiple threads at the same time.

For many workflows this is perfectly fine, as async code should spend most of its time waiting for IO anyway.

One approach is to use FuturesUnordered. This is a special collection that can store many different futures, and it has a next function that runs all of them concurrently, and returns once the first of them finished. (The next function is only available when StreamExt is imported)

You can use it like this:

use futures::stream::{FuturesUnordered, StreamExt};

async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
}

async fn parallel_stuff(array: &[u64]) {
    let mut tasks = FuturesUnordered::new();
    for i in array {
        let task = do_sth(i);
        tasks.push(task);
    }
    // This loop runs everything concurrently, and waits until they have
    // all finished.
    while let Some(()) = tasks.next().await { }
}

Note: The FuturesUnordered must be defined after the shared value. Otherwise you will get a borrow error that is caused by them being dropped in the wrong order.


Another approach is to use a Stream. With streams, you can use buffer_unordered. This is a utility that uses FuturesUnordered internally.

use futures::stream::StreamExt;

async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
}

async fn parallel_stuff(array: &[u64]) {
    // Create a stream going through the array.
    futures::stream::iter(array)
    // For each item in the stream, create a future.
        .map(|i| do_sth(i))
    // Run at most 10 of the futures concurrently.
        .buffer_unordered(10)
    // Since Streams are lazy, we must use for_each or collect to run them.
    // Here we use for_each and do nothing with the return value from do_sth.
        .for_each(|()| async {})
        .await;
}

Note that in both cases, importing StreamExt is important as it provides various methods that are not available on streams without importing the extension trait.

Argumentative answered 14/12, 2020 at 10:35 Comment(0)
R
3

The current existing answer boils down to:

It is possible to "spawn" a non-static future, as long as it's constrained to run in the same thread as the caller.

This left me unsatisfied. At least on the surface it looks like it should be possible to fully spawn a scoped future, the same way it is possible to spawn a scoped thread. Turns out tokio explored this idea under the name of structured concurrency. Unfortunately they weren't able to really make it work mainly because (IIUC) it's currently not possible to enforce the scoping in a non-blocking + idiomatic way. This comment explains it in more detail.

Rejection answered 18/7, 2023 at 19:2 Comment(0)
L
-1

In case of code that uses threads for parallelism, it is possible to avoid copying by extending a lifetime with transmute. An example:

fn main() {
    let now = std::time::Instant::now();
    let string = format!("{now:?}");
    println!(
        "{now:?} has length {}",
        parallel_len(&[&string, &string]) / 2
    );
}

fn parallel_len(input: &[&str]) -> usize {
    // SAFETY: this variable needs to be static, because it is passed into a thread,
    // but the thread does not live longer than this function, because we wait for
    // it to finish by calling `join` on it.
    let input: &[&'static str] = unsafe { std::mem::transmute(input) };
    let mut threads = vec![];
    for txt in input {
        threads.push(std::thread::spawn(|| txt.len()));
    }
    threads.into_iter().map(|t| t.join().unwrap()).sum()
}

It seems reasonable that this should also work for asynchronous code, but I do not know enough about that to say for sure.

Listen answered 8/4, 2022 at 8:30 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.