How do I await a future inside a non-async method which was called from an async method within the context of a Tokio runtime?
Asked Answered
B

1

26

I'm using Tokio 1.1 to do async things. I have an async main with #[tokio::main] so I'm already operating with a runtime.

main invokes a non-async method where I'd like to be await on a future (specifically, I'm collecting from a datafusion dataframe). This non-async method has a signature prescribed by a trait which returns a struct, not a Future<Struct>. As far as I'm aware, I can't mark it async.

If I try and call df.collect().await;, I get the

only allowed inside async functions and blocks

error from the compiler, pointing out that the method that I'm calling await within is not async.

If I try and block_on the future from a new runtime like this:

tokio::runtime::Builder::new_current_thread()
    .build()
    .unwrap()
    .block_on(df.collect());

I get a runtime panic:

Cannot start a runtime from within a runtime. This happens because a function (like block_on) attempted to block the current thread while the thread is being used to drive asynchronous tasks.

If I try futures::executor::block_on(df.collect()).unwrap();, I get a new runtime panic:

'not currently running on a Tokio 0.2.x runtime.'

which is weird because I'm using Tokio v1.1.

This feels harder than it should. I'm within an async context, and it feels like the compiler should know that and allow me to call .await from within the method - the only code path invokes this method from within an async block. Is there a simple way to do this that I'm missing?

Badinage answered 3/2, 2021 at 20:37 Comment(5)
awaiting inside a sync functions just doesn't fundamentally work. You can spawn a blocking task with tokio::spawn_blocking.Underwing
@Shepmaster My not-async method has a signature prescribed by a trait, and which returns a struct, not a Future<Struct>. As far as I'm aware, I can't mark it async. I'll take a closer look at that other q, and update the original if there's a difference. I think I tried thread::spawn, but can't quite remember why that was problemsome.Badinage
@Ibraheem don't think I saw spawn_blocking , I'll take a look and update. Thanks!Badinage
@IbraheemAhmed - spawn_blocking seems to return a JoinHandle which still needs to be await-ed - unless I'm missing something?Badinage
Let us continue this discussion in chat.Sough
U
17

I'm within an async context, and it feels like the compiler should know that and allow me to call .await from within the method

It is fundamentally impossible to await inside of a synchronous function whether or not you are within the context of a runtime. await's are transformed into yield points, and async functions are transformed into state machine's that make use of these yield points to perform asynchronous computations. Without marking your function as async, this transformation is impossible.

If I understand your question correctly, you have the following code:

#[tokio::main]
async fn main() {
    let foo = Foo {};
    foo.bar()
}

impl Trait for Foo { 
    fn bar(df: DataFrame) -> Vec<Data> {
        df.collect().await
    }
}

The issue is that you cannot await df.collect from within bar, because it is not marked as async. If you can modify the signature of Trait, then you can make Trait::bar an async method with the workarounds mentioned in How can I define an async method in a trait?.

If you cannot change the signature of Trait, then you have a problem. Async functions should never spend a long time without reaching a .await. As explained in What is the best approach to encapsulate blocking I/O in future-rs?, you can to use spawn_blocking when transitioning into non-async code:

#[tokio::main]
async fn main() {
    let foo = Foo {};
    tokio::task::spawn_blocking(move || foo.bar()).await
}

impl Trait for Foo { 
    fn bar(df: DataFrame) -> Vec<Data> {
        df.collect().await
    }
}

Now you need a way to run df.collect to completion, without awaiting. You mentioned that you tried to create a nested runtime to solve this problem:

If I try and block_on the future from a new runtime ... I get a panic

However, tokio does not allow you create nested runtimes. You could create a new, independent runtime, as explained in How can I create a Tokio runtime inside another Tokio runtime. However, spawning a nested runtime would be inefficient.

Instead of spawning a new runtime, you can get a handle to the current runtime:

let handle = Handle::current();

Enter the runtime context:

handle.enter();

And then run the future to completion with futures::executor::block_on:

impl Trait for Foo { 
    fn bar(df: DataFrame) -> Vec<Data> {
        let handle = Handle::current();
        handle.enter();
        futures::executor::block_on(df.collect())
    }
}

Entering the tokio runtime context will solve the error you were getting previously:

If I try futures::executor::block_on(df.collect()).unwrap();, I get a new runtime panic not currently running on a Tokio 0.2.x runtime

I would urge you to try and avoid doing this if you can. The optimal solution would be to mark Trait::bar as async and await as normal. Any other solutions, including the ones mentioned above, involve blocking the current thread until the given future completes.

Credit @AliceRyhl for the explanation

Underwing answered 19/2, 2021 at 16:5 Comment(4)
Thanks - I wasn't aware that these were all possible. In the end, you were right - refactoring to make async was the right thing to do.Badinage
Great explanation. I'm new to Rust, and I had this problem in a situation where my program was essentially a single threaded application, but the 3rd party software I used had an asynchronous API. The problem arose when I wanted to make a wrapper for the asynchronous stream that made it compatible with code expecting something implementing the Read trait. The "hack" of using futures::executor::block_on(...) would fix that for me, but your explanation makes it seem like I should avoid it all costs?Fogy
If your application is sync but you need to use an async library then using some form of block_on is fine. If your app is async, then using blocking the current thread is almost never fine. You have to take care when interacting with blocking code by using spawn_blocking or similar.Underwing
Can you have multiple block_on / spawn_blocking task even with a multi-threaded runtime? I could be wrong but I feel like I have two tasks trying to block_on (1 HTTP server, and then 1 route in that HTTP server needing to do async stuff) and it doesn't work well/I can't figure it out.Realm

© 2022 - 2024 — McMap. All rights reserved.