Rust Async Drop
Asked Answered
H

2

25

I'm facing a scenario where I need to run async code from the drop handler of an object. The whole application runs in a tokio async context, so I know that the drop handler is called with an active tokio Runtime, but unfortunately drop itself is a sync function.

Ideally, I'd like a solution that works on both multi-thread and current-thread runtimes, but if that doesn't exist, then I'm ok with a solution that blocks the dropping thread and relies on other threads to drive the futures.

I considered multiple options but I'm not sure which approach is best or understand much about their trade offs. For these examples, let's assume my class has an async terminate(&mut self) function that I would like to be called from drop().

struct MyClass;
impl MyClass {
    async fn terminate(&mut self) {}
}

Option 1: tokio::runtime::Handle::block_on

impl Drop for MyClass {
    fn drop(&mut self) {
        tokio::runtime::Handle::current().block_on(self.terminate());
    }
}

This seems to be the most straightforward approach, but unfortunately it panics with

Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.

see playground

I'm a bit confused by this since I thought Handle::block_on would use the currently running runtime but it seems this tries to start a new runtime? What is going on here?

Also, according to the documentation of Handle::block_on, this cannot drive IO threads. So I guess blocking this thread is a risk - if too many objects are destructed at the same time, each blocking a thread, and those futures wait for IO work, then this will deadlock.

Option 2: futures::executor::block_on

impl Drop for MyClass {
    fn drop(&mut self) {
        futures::executor::block_on(self.terminate());
    }
}

see playground

This seems to work fine. If I understand this correctly, then it spawns a new non-tokio executor on the current thread and has that thread drive the future. Is this an issue? Does this cause conflicts between the running tokio executor and the new futures executor?

Also, can this actually drive IO threads, avoiding the issue of option 1? Or can it happen that those IO threads are still waiting on the tokio executor?

Option 3: tokio::task::spawn with futures::executor::block_on

impl Drop for MyClass {
    fn drop(&mut self) {
        let task = tokio::task::spawn(self.terminate());
        futures::executor::block_on(task);
    }
}

see playground

This should have the tokio runtime drive the termination future while the futures runtime only blocks the current thread to wait until the tokio runtime finished? Is this safer than option 2 and causes fewer conflicts between the runtimes? Unfortunately, this ran into a lifetime issue I couldn't figure out.:

error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
   --> src/main.rs:8:44
    |
7   |     fn drop(&mut self) {
    |             --------- this data with an anonymous lifetime `'_`...
8   |         let task = tokio::task::spawn(self.terminate());
    |                                       ---- ^^^^^^^^^
    |                                       |
    |                                       ...is used here...
    |
note: ...and is required to live as long as `'static` here
   --> src/main.rs:8:20
    |
8   |         let task = tokio::task::spawn(self.terminate());
    |                    ^^^^^^^^^^^^^^^^^^
note: `'static` lifetime requirement introduced by this bound
   --> /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:28
    |
127 |         T: Future + Send + 'static,
    |                            ^^^^^^^

I also tried to fix this with LocalSet but couldn't get it to work. Any way to make this work?

Option 3b

I was, however, able to make it work if I make terminate() take self by value and wrap MyClass into a Wrapper. Not pretty but maybe better than Option 2 because it uses the tokio runtime to drive the future?

struct MyClass;
impl MyClass {
  async fn terminate(self) {}
}

struct Wrapper(Option<MyClass>);

impl Drop for Wrapper {
    fn drop(&mut self) {
        if let Some(v) = self.0.take() {
            let task = tokio::task::spawn(v.terminate());
            futures::executor::block_on(task).unwrap();
        }
    }
}

see playground

Is this a good approach? Is it actually important that the tokio runtime drives the drop future or is the simpler Option 2 better? Any ways to make option 3b prettier / easier to use?

Option 4: Background Task

I found this option here: https://mcmap.net/q/539996/-how-do-i-implement-an-async-drop-in-rust It basically spawns a background task in the constructor of the object that waits for a trigger and runs async drop code when triggered. The drop implementation then triggers it and runs a busy waiting loop until it is finished.

This seems overly complex and also more error prone than the other options here. Or is this actually the best solution?

Side question on exhausting worker threads

Except for option 1, all of these options block a tokio worker thread to wait for the async drop to complete. In a multi threaded runtime, this will go well most of the time, but could in theory lock up all worker threads if multiple destructors run in parallel - and IIUC then we would have a deadlock with no thread making progress. Option 1 seems somewhat better but the block_on documentation says it can only drive non-IO futures. So it could still lock up if too many destructors do IO work. Is there a way to tell tokio to increase the number of worker threads by one? If we do that for each thread we block, would that avoid this issue?

Option 5: new runtime in new thread

impl Drop for MyClass {
    fn drop(&mut self) {
        std::thread::scope(|s| {
            s.spawn(|| {
                let runtime = tokio::runtime::Builder::new_multi_thread()
                    .build()
                    .unwrap();
                runtime.block_on(self.terminate());
            });
        });
    }
}

see playground

This seems to work and attempts to avoid the issue of blocking worker threads by running the drop task on a new runtime in a new thread. This new thread should, hopefully, be able to drive IO tasks. But does this actually fully solve the problem? What if the drop task depends on an IO task that is running on the main tokio executor? I think this may still have a chance of causing the program to lock up indefinitely.

Hildy answered 19/3, 2022 at 20:23 Comment(2)
Async drop is an unsolved problem. I don't know if you can tokio assets from one runtime instance and send them into a different runtime instance, but I suspect not. I think you should try these out with a less trivial example, for example put a tokio timer call in terminate. This will be a better demonstration of what will and won't work.Guideboard
For instance, I don't think the futures::block_on executor is compatible with tokio APIs.Guideboard
C
5

Option 1: tokio::runtime::Handle::block_on

The block_on function is the entrypoint to a tokio runtime; it's what's run when you annotate with #[tokio::main], for example. Would this work, tokio would spawn a whole new runtime, and block the current thread on its completion. You definitely don't want that!

Option 2: futures::executor::block_on

This works, but blocks, so is non-ideal, as no other tasks on this thread can make progress until it finishes.

Option 3: tokio::task::spawn with futures::executor::block_on

You don't need block_on here; spawning a task will run that task to completion. No need to block any threads! This is what I would do. However, you noticed a problem, and if the compiler allowed this, it would result in a memory error. Let's pretend we can do it:

  1. We have foo: MyClass.
  2. foo is dropped.
  3. We spawn a task with a reference to foo to run foo.terminate().
  4. foo no longer exists, but we have a background task with a reference to it! The best case is a seg-fault.

So how can we avoid it? This leads right into Option 3b.

Option 3b

I think this is a good solution (again, without the block_on).

If MyClass has a cheap default() implementation, then you don't need the wrapper, and can swap it with a default. My first thought is to call std::mem::take, which leaves a default in its place, but this runs into a problem; you'll end up with a stack overflow calling drop. So, we can use a flag to indicate it's been dropped:

#[derive(Default)]
struct MyClass {
    dropped: bool,
}

impl MyClass {
    async fn terminate(&mut self) {
        println!("Terminating");
    }
}

impl Drop for MyClass {
    fn drop(&mut self) {
        if !self.dropped {
            let mut this = MyClass::default();
            std::mem::swap(&mut this, self);
            this.dropped = true;
            tokio::spawn(async move { this.terminate().await });
        }
    }
}

If you find yourself wanting to reach for this a lot, you could create a Dropper wrapper to use with various types:

#[async_trait::async_trait]
pub trait AsyncDrop {
    async fn async_drop(&mut self);
}

#[derive(Default)]
pub struct Dropper<T: AsyncDrop + Default + Send + 'static> {
    dropped: bool,
    inner: T,
}

impl<T: AsyncDrop + Default + Send + 'static> Dropper<T> {
    pub fn new(inner: T) -> Self {
        Self {
            dropped: false,
            inner,
        }
    }
}

impl<T: AsyncDrop + Default + Send + 'static> Drop for Dropper<T> {
    fn drop(&mut self) {
        if !self.dropped {
            let mut this = Dropper::default();
            std::mem::swap(&mut this, self);
            this.dropped = true;

            tokio::spawn(async move {
                this.inner.async_drop().await;
            });
        }
    }
}

Option 4: Background Task

This is covered already by another answer: https://mcmap.net/q/533826/-rust-async-drop

Option 5: new runtime in a new thread

I would definitely not spawn a new runtime every time you want to drop; that is very heavy-handed.

By using a scoped thread, you also don't solve the problem of blocking. The thread will join at the end of the scope, which is immediate, and block until the runtime completes.

Chimborazo answered 27/2, 2023 at 18:15 Comment(3)
Regarding option 3, it seems like an alternative could be to derive Clone for the struct, and clone its data within drop for sending to the new thread. (for some cases)Peristome
Why doesn't it guarantee the order of the drops? It does drop at the end as expected, but if I have multiple drops it won't go in reverse order. Always different.Calbert
Why doesn't what guarantee the order of drops? If you're talking about Option 3b, we're spawning a task to do the dropping. Tasks are executed in parallel, you can't guarantee order of execution. If you need to guarantee order, you could use a channel and a single long-lived background task that reads from it and drops in order.Chimborazo
F
1

If you want to "do something", without exclusive mutable access to MyClass, maybe using oneshot channels to trigger async compute would work? Somewhat similar to the option #4.

You can send some extra state through the channel too.

use std::time::Duration;

use tokio::{
    runtime::Runtime,
    sync::oneshot::{self, Receiver, Sender},
    time::interval,
};

struct MyClass {
    tx: Option<Sender<()>>, // can have SomeStruct instead of () 
    // my_state: Option<SomeStruct>
}

impl MyClass {
    pub async fn new() -> Self {
        println!("MyClass::new()");

        let (tx, mut rx) = oneshot::channel();

        tokio::task::spawn(async move {
            let mut interval = interval(Duration::from_millis(100));

            println!("drop wait loop starting...");

            loop {
                tokio::select! {
                    _ = interval.tick() => println!("Another 100ms"),
                    msg = &mut rx => {
                        println!("should process drop here");
                        break;
                    }
                }
            }
        });

        Self { tx: Some(tx) }
    }
}

impl Drop for MyClass {
    fn drop(&mut self) {
        println!("drop()");
        self.tx.take().unwrap().send(()).unwrap();
        // self.tx.take().unwrap().send(self.my_state.take().unwrap()).unwrap(); 
    }
}

#[tokio::main]
async fn main() {
    let class = MyClass::new().await;
}

This prints most of the time:

MyClass::new()
drop()
drop wait loop starting...
should process drop here

sometimes the process exists before the receiving side-task gets a chance to spawn. But if you have a non-exiting code, should be fine.

Not sure if the select! interval.tick is necessary, though unfortunately oneshot channel has no async blocking receive method.

Foresheet answered 4/4, 2022 at 17:35 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.