How to store self-removing futures in a list
Asked Answered
C

2

9

I have some tasks that need to be performed asynchronously, and the server can't close while there are still tasks running. So I'm trying to store the futures returned by std::async in a list, but I also don't want to get an infinitely growing list of those. So I want to remove the futures as they're completed.

Here's roughly what I'm trying to do:

// this is a member of the server class
std::list<std::future<void>> pending;

std::list<std::future<void>>::iterator iter = ???;

pending.push_back( std::async( std::launch::async, [iter]()
{
    doSomething();
    pending.remove( iter );
} );

Here, iter needs to be pointing to the newly inserted element, but I can't get it before inserting the element (there is no iterator), nor after (since it is passed to the lambda by value). I could make a shared_ptr to store the iterator, but that seems to be way overkill.

Is there a better pattern for this?

Update: there seems to be another issue with this. When a future attempts to remove itself from the list, it is essentially waiting for itself to complete, which locks everything up. Oops!

On top of that, list destructor empties the list before calling element destructors.

Chastise answered 18/3, 2019 at 10:55 Comment(7)
Are you aware that this code anyway will require some kind of synchronization, like mutex?Sejant
You mean for remove? Yes, I'll need something like that. The push operation is only called from socket listener so that should be synchronized already.Chastise
I'd be careful with that. C++ memory model does not guarantee visibility of changes between threads without memory fencesSejant
Oh right, I will need to sync the pushes with removes anyway so might as well put a mutex everywhere.Chastise
Or use lockfree list, whatever worksSejant
Possible duplicate of What's the C++ 11 way to fire off an asynchronous task and forget about it?Procarp
I don't want to forget about it though, because the server needs to wait for tasks to be done before closing. It could be done via some counters/conditions, but it seems std::async was made exactly to avoid going through all this trouble.Chastise
T
7

It appears you can just append a default std::future to the list, get an iterator to that and then move your future in.

Mind you, that non-mutex-protected remove(iter) looks awfully dangerous.

Tickle answered 18/3, 2019 at 11:13 Comment(7)
Not only remove, the push_back need to be synchronized as well.Buran
@DanielLangr: That is one solution. The other is to drop the idea that futures remove themselves, and instead have the same thread do the push_back and the erase.Tickle
Actually now that I think about it, is it even possible for a future to remove itself from the list? Wouldn't it lock everything up, since it will be waiting for itself to complete?Chastise
@riv: Have you checked if there's already a StackOverflow question about that? Because it is a good question, and shouldn't be buried in a comment.Tickle
I'm trying to see what happens for myself and I'm getting a runtime error saying iterator can't be incremented, so once I figure out what that means I might ask a question.Chastise
Oh, that's because in my example the list itself is emptied (due to going out of scope) before the elements are destroyed. That seems to be a problem.Chastise
@riv: Seeing your comment on a deleted answer, why bother removing the futures from the list at all? As the list is going out of scope anyway, the contents do not matter.Tickle
C
2

Here's one way. I don't think this one needs futures:

#include <unordered_set>
#include <condition_variable>
#include <mutex>
#include <thread>

struct server
{
    std::mutex pending_mutex;
    std::condition_variable pending_condition;
    std::unordered_set<unsigned> pending;
    unsigned next_id =  0;

    void add_task()
    {
        auto lock = std::unique_lock(pending_mutex);
        auto id = next_id++;
        auto t = std::thread([this, id]{
            this->doSomething();
            this->notify_complete(id);
        });
        t.detach(); // or we could store it somewhere. e.g. pending could be a map
        pending.insert(id);
    }

    void doSomething();

    void notify_complete(unsigned id)
    {
        auto lock = std::unique_lock(pending_mutex);
        pending.erase(id);
        if (pending.empty())
            pending_condition.notify_all();
    }

    void wait_all_complete()
    {
        auto none_left = [&] { return pending.empty(); };

        auto lock = std::unique_lock(pending_mutex);
        pending_condition.wait(lock, none_left);
    }
};


int main()
{
    auto s = server();
    s.add_task();
    s.add_task();
    s.add_task();
    s.wait_all_complete();
}

Here it is with futures, in case that's important:

#include <unordered_map>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <future>

struct server
{
    std::mutex pending_mutex;
    std::condition_variable pending_condition;
    std::unordered_map<unsigned, std::future<void>> pending;
    unsigned next_id =  0;

    void add_task()
    {
        auto lock = std::unique_lock(pending_mutex);
        auto id = next_id++;
        auto f = std::async(std::launch::async, [this, id]{
            this->doSomething();
            this->notify_complete(id);
        });
        pending.emplace(id, std::move(f));
    }

    void doSomething();

    void notify_complete(unsigned id)
    {
        auto lock = std::unique_lock(pending_mutex);
        pending.erase(id);
        if (pending.empty())
            pending_condition.notify_all();
    }

    void wait_all_complete()
    {
        auto none_left = [&] { return pending.empty(); };

        auto lock = std::unique_lock(pending_mutex);
        pending_condition.wait(lock, none_left);
    }
};


int main()
{
    auto s = server();
    s.add_task();
    s.add_task();
    s.add_task();
    s.wait_all_complete();
}

Here's the list version:

#include <list>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <future>

struct server
{
    using pending_list = std::list<std::future<void>>;
    using id_type = pending_list::const_iterator;

    std::mutex pending_mutex;
    std::condition_variable pending_condition;
    pending_list pending;

    void add_task()
    {
        auto lock = std::unique_lock(pending_mutex);

        // redundant construction
        auto id = pending.emplace(pending.end());
        auto f = std::async(std::launch::async, [this, id]{
            this->doSomething();
            this->notify_complete(id);
        });
        *id = std::move(f);
    }

    void doSomething();

    void notify_complete(id_type id)
    {
        auto lock = std::unique_lock(pending_mutex);
        pending.erase(id);
        if (pending.empty())
            pending_condition.notify_all();
    }

    void wait_all_complete()
    {
        auto none_left = [&] { return pending.empty(); };

        auto lock = std::unique_lock(pending_mutex);
        pending_condition.wait(lock, none_left);
    }
};


int main()
{
    auto s = server();
    s.add_task();
    s.add_task();
    s.add_task();
    s.wait_all_complete();
}
Cockcrow answered 18/3, 2019 at 11:42 Comment(3)
The second version is roughly the same as what I'm trying to do, except it uses a map instead of list, which I figured is unnecessary since IDs are meaningless to me so might as well use iterators.Chastise
Sure you could use a list. Although the overhead of using a map is minimal. You'd need to default construct the map entry and the. Move the future in. This might require some exception handling for completeness. I find the map solution to be very maintainable and flexible in the real world. Pay your money, take your choice. :)Cockcrow
@Chastise added a list version for completeness.Cockcrow

© 2022 - 2024 — McMap. All rights reserved.