Signaling main thread when std::future is ready to be retrieved
Asked Answered
R

3

11

I'm trying to understand the std::async, std::future system. What I don't quite understand is how you deal with running multiple async "tasks", and then, based on what returns first, second, etc, running some additional code.

Example: Let's say your main thread is in a simple loop. Now, based on user input, you run several functions via std::async, and save the futures in a std::list.

My issue is, how do I pass information back from the std::async function that can specify which future is complete?

My main thread is basically in a message loop, and what I need to do is have a function run by std::async be able to queue a message that somehow specifies which future is complete. The issue is that the function doesn't have access to the future.

Am I just missing something?

Here is some pseudo-code of what I'm trying to accomplish; extra points if there is a way to also have a way to have a way to make a call to "cancel" the request using a cancelation token.

class RequestA
{
public:
    int input1;

    int output1;
};

main()
{
    while(1)
    {
       //check for completion
       // i.e. pop next "message"
       if(auto *completed_task = get_next_completed_task())
       {
          completed_task->run_continuation();
       }

       // other code to handle user input
       if(userSaidRunA())
       {
          // note that I don't want to use a raw pointer but
          // am not sure how to use future for this
          RequestA *a = new RequestA();
          run(a, OnRequestTypeAComplete);
       }

    }
}

void OnRequestTypeAComplete(RequestA &req)
{
    // Do stuff with req, want access to inputs and output
}
Rebuild answered 24/8, 2017 at 2:56 Comment(3)
You could pass callable to the executing thread that would put a message in the queue.Dinesen
std::async operates on a Callable object. You could, for instance, assign an ID number to that object when you create it, and store that ID in the list with the std::future object. The Callable could then post its ID to the message queue and the message handler can look for the ID in the list. Another option would be to add an empty std::future to the list first, then pass an iterator to that object to the Callable so it can post it back to the message queue. Move the result of std::async to that existing std::future object, and have the message handler use the posted iteratorEsmeraldaesmerelda
You may want to take a look at e.g. the HPX library (stellar-group.org/category/hpx). The standard library currently does not have elegant solutions to these issues. In particular, HPX offers continuations for futures with then(), and waiting for any of a whole range of futures with when_any()/wait_any().Osana
L
5

Unfortunately C++11 std::future doesn't provide continuations and cancellations. You can retrieve result from std::future only once. Moreover future returned from std::async blocks in its destructor. There is a group headed by Sean Parent from Adobe. They implemented future, async, task as it should be. Also functions with continuation like when_all, when_any. Could be it is what you're looking for. Anyway have a look at this project. Code has good quality and can be read easily.

If platform dependent solution are also ok for you you can check them. For windows I know PPL library. It also has primitives with cancellation and continuation.

Levo answered 24/8, 2017 at 8:46 Comment(1)
The issue is that when_any doesn't help with the continuation or the cancellations. I've looked at PPL, and I don't think it really fits what I'm trying to do.Rebuild
A
2

You can create a struct containing a flag and pass a reference to that flag to your thread function.

Something a bit like this:

int stuff(std::atomic_bool& complete, std::size_t id)
{
    std::cout << "starting: " << id << '\n';

    // do stuff
    std::this_thread::sleep_for(std::chrono::milliseconds(hol::random_number(3000)));

    // generate value
    int value = hol::random_number(30);

    // signal end
    complete = true;
    std::cout << "ended: " << id << " -> " << value << '\n';

    return value;
}

struct task
{
    std::future<int> fut;
    std::atomic_bool complete;

    task() = default;
    task(task&& t): fut(std::move(t.fut)), complete(t.complete.load()) {}
};

int main()
{
    // list of tasks
    std::vector<task> tasks;

    // reserve enough spaces so that nothing gets reallocated
    // as that would invalidate the references to the atomic_bools
    // needed to signal the end of a thread
    tasks.reserve(3);

    // create a new task
    tasks.emplace_back();

    // start it running
    tasks.back().fut = std::async(std::launch::async, stuff, std::ref(tasks.back().complete), tasks.size());

    tasks.emplace_back();
    tasks.back().fut = std::async(std::launch::async, stuff, std::ref(tasks.back().complete), tasks.size());

    tasks.emplace_back();
    tasks.back().fut = std::async(std::launch::async, stuff, std::ref(tasks.back().complete), tasks.size());

    // Keep going as long as any of the tasks is incomplete
    while(std::any_of(std::begin(tasks), std::end(tasks),
        [](auto& t){ return !t.complete.load(); }))
    {

        // do some parallel stuff
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }

    // process the results

    int sum = 0;
    for(auto&& t: tasks)
        sum += t.fut.get();

    std::cout << "sum: " << sum << '\n';
}
Awildaawkward answered 24/8, 2017 at 3:56 Comment(2)
This isn't exactly what I'm trying to do. What I want to be able to do is identify which task completed without iterating through all of the tasks. One of my ideas was to have the end of the async call enqueue a struct which has a continuation lamba, which could possibly contain some of the state of the "request"Rebuild
@bpelkes I'm not sure the standard libraries can do that in a neat way, see #19225872 ,Seneschal
P
2

Here a solution with a std::unordered_map instead of a std::list in which you don't need to modify your callables. Instead of that, you use a helper function that assigns an id to each task and notify when they finish:

class Tasks {
public:
    /*
     *  Helper to create the tasks in a safe way.
     *  lockTaskCreation is needed to guarantee newTask is (temporarilly)
     *  assigned before it is moved to the list of tasks
     */
    template <class R, class ...Args>
    void createNewTask(const std::function<R(Args...)>& f, Args... args) {
        std::unique_lock<std::mutex> lock(mutex);
        std::lock_guard<std::mutex> lockTaskCreation(mutexTaskCreation);
        newTask = std::async(std::launch::async, executeAndNotify<R, Args...>,
            std::move(lock), f, std::forward<Args>(args)...);
    }

private:
    /*
     *  Assign an id to the task, execute it, and notify when finishes
     */
    template <class R, class ...Args>
    static R executeAndNotify(std::unique_lock<std::mutex> lock,
        const std::function<R(Args...)>& f, Args... args)
    {
        {
            std::lock_guard<std::mutex> lockTaskCreation(mutexTaskCreation);
            tasks[std::this_thread::get_id()] = std::move(newTask);
        }
        lock.unlock();
        Notifier notifier;
        return f(std::forward<Args>(args)...);
    }

    /*
     *  Class to notify when a task is completed (follows RAII)
     */
    class Notifier {
    public:
        ~Notifier() {
            std::lock_guard<std::mutex> lock(mutex);
            finishedTasks.push(std::this_thread::get_id());
            cv.notify_one();
        }
    };

    /*
     *  Wait for a finished task.
     *  This function needs to be called in an infinite loop
     */
    static void waitForFinishedTask() {
        std::unique_lock<std::mutex> lock(mutex);
        cv.wait(lock, [] { return finishedTasks.size() || finish; });
        if (finishedTasks.size()) {
            auto threadId = finishedTasks.front();
            finishedTasks.pop();
            auto result = tasks.at(threadId).get();
            tasks.erase(threadId);
            std::cout << "task " << threadId
                << " returned: " << result << std::endl;
        }
    }

    static std::unordered_map<std::thread::id, std::future<int>> tasks;
    static std::mutex mutex;
    static std::mutex mutexTaskCreation;
    static std::queue<std::thread::id> finishedTasks;
    static std::condition_variable cv;
    static std::future<int> newTask;

    ...
};

...

Then, you can call an async task in this way:

int doSomething(int i) {
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
    return i;
}

int main() {
    Tasks tasks;
    tasks.createNewTask(std::function<decltype(doSomething)>(doSomething), 10);
    return 0;
}

See a complete implementation run on Coliru

Payoff answered 21/9, 2017 at 4:8 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.