Calling async function inside an iterator
Asked Answered
B

1

19

I want to await an async function inside a closure used in an iterator. The function requiring the closure is called inside a struct implementation. I can't figure out how to do this.

This code simulates what I'm trying to do:

struct MyType {}

impl MyType {
    async fn foo(&self) {
        println!("foo");

        (0..2).for_each(|v| {
            self.bar(v).await;
        });
    }

    async fn bar(&self, v: usize) {
        println!("bar: {}", v);
    }
}

#[tokio::main]
async fn main() {
    let mt = MyType {};
    mt.foo().await;
}

Obviously, this will not work since the closure is not async, giving me:

error[E0728]: `await` is only allowed inside `async` functions and blocks
 --> src/main.rs:8:13
  |
7 |         (0..2).for_each(|v| {
  |                         --- this is not `async`
8 |             self.bar(v).await;
  |             ^^^^^^^^^^^^^^^^^ only allowed inside `async` functions and blocks

After looking for an answer on how to call an async function from a non-async one, I eded up with this:

tokio::spawn(async move {
    self.bar(v).await;
});

But now I'm hitting lifetime issues instead:

error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
 --> src/main.rs:4:18
  |
4 |     async fn foo(&self) {
  |                  ^^^^^
  |                  |
  |                  this data with an anonymous lifetime `'_`...
  |                  ...is captured here...
...
8 |             tokio::spawn(async move {
  |             ------------ ...and is required to live as long as `'static` here

This also doesn't surprise me since from what I understand the Rust compiler cannot know how long a thread will live. Given this, the thread spawned with tokio::spawn might outlive the type MyType.

The first fix I came up with was to make bar an associate function, copy everything I need in my closure and pass it as a value to bar and call it with MyType::bar(copies_from_self) but this is getting ugly since there's a lot of copying. It also feels like a workaround for not knowing how lifetimes work.

I was instead trying to use futures::executor::block_on which works for simple tasks like the one in this post:

(0..2).for_each(|v| {
    futures::executor::block_on(self.bar(v));
});

But when putting this in my real life example where I use a third party library1 which also uses tokio, things no longer work. After reading the documentation, I realise that #[tokio::main] is a macro that eventually wraps everything in block_on so by doing this there will be nested block_on. This might be the reason why one of the async methods called in bar just stops working without any error or logging (works without block_on so shouldn't be anything with the code). I reached out to the authors who said that I could use for_each(|i| async move { ... }) which made me even more confused.

(0..2).for_each(|v| async move {
    self.bar(v).await;
});

Will result in the compilation error

expected `()`, found opaque type`

which I think makes sense since I'm now returning a future and not (). My naive approach to this was to try and await the future with something like this:

(0..2).for_each(|v| {
    async move {
        self.bar(v).await;
    }
    .await
});

But that takes me back to square one, resulting in the following compilation error which I also think makes sense since I'm now back to using await in the closure which is sync.

only allowed inside `async` functions and blocks` since the 

This discovery also makes it hard for me to make use of answers such as the ones found here and here.

The question after all this cargo cult programming is basically, is it possible, and if so how do I call my async function from the closure (and preferably without spawning a thread to avoid lifetime problems) in an iterator? If this is not possible, what would an idiomatic implementation for this look like?


1This is the library/method used

Brahmi answered 24/1, 2021 at 14:14 Comment(0)
G
24

Iterator::for_each expects a synchronous closure, thus you can't use .await in it (not directly at least), nor can you return a future from it.

One solution is to just use a for loop instead of .for_each:

for v in 0..2 {
    self.bar(v).await;
}

The more general approach is to use streams instead of iterators, since those are the asynchronous equivalent (and the equivalent methods on streams are typically asynchronous as well). This would work not only for for_each but for most other iterator methods:

use futures::prelude::*;

futures::stream::iter(0..2)
    .for_each(|c| async move {
        self.bar(v).await;
    })
    .await;
Guerdon answered 24/1, 2021 at 14:39 Comment(3)
Thank you so much! The actual iterator is not a for_each but try_for_each. This means I cannot really change to a for loop nor use futures::stream::iter. Given this, do you see any other solution than using tokio::spawn and copy all data I need from self?Brahmi
I realise now that there's maye a way to use TwitterStream other than try_for_each? Will try to see if I can figure something out.Brahmi
Your answer made me read about streams and realise I was working with a stream. I changed stream.try_for_each(|m| {}) to while let Some(x) = stream.try_next().await.unwrap() {} which made me not use a closure. Thank you so much!Brahmi

© 2022 - 2024 — McMap. All rights reserved.