How do I reverse set_value() and 'deactivate' a promise?
Asked Answered
D

4

4

I have a total n00b question here on synchronization. I have a 'writer' thread which assigns a different value 'p' to a promise at each iteration. I need 'reader' threads which wait for shared_futures of this value and then process them, and my question is how do I use future/promise to ensure that the reader threads wait for a new update of 'p' before performing their processing task at each iteration? Many thanks.

Diaconal answered 30/6, 2011 at 21:34 Comment(3)
Will all the readers be working on the same value of 'p'? Or will each reader be working on a different value? Is a single-writer, multiple-reader queue an appropriate fit here?Macaw
All the readers work on the same value of 'p' in my case. So my issue is how to set a new value of 'p' at each iteration of the writer, and have the readers wait for this new value after each processing step is complete.Diaconal
You should really issue a new promise each time, not re-use the same promise.Whiles
A
4

You can "reset" a promise by assigning it to a blank promise.

myPromise = promise< int >();

A more complete example:

promise< int > myPromise;

void writer()
{
    for( int i = 0; i < 10; ++i )
    {
        cout << "Setting promise.\n";
        myPromise.set_value( i );

        myPromise = promise< int >{};       // Reset the promise.

        cout << "Waiting to set again...\n";
        this_thread::sleep_for( chrono::seconds( 1 ));
    }
}

void reader()
{
    int result;
    do
    {
        auto myFuture = myPromise.get_future();
        cout << "Waiting to receive result...\n";
        result = myFuture.get();
        cout << "Received " << result << ".\n";
    } while( result < 9 );
}

int main()
{
    std::thread write( writer );
    std::thread read( reader );

    write.join();
    read.join();

    return 0;
}

A problem with this approach, however, is that synchronization between the two threads can cause the writer to call promise::set_value() more than once between the reader's calls to future::get(), or future::get() to be called while the promise is being reset. These problems can be avoided with care (e.g. with proper sleeping between calls), but this takes us into the realm of hacking and guesswork rather than logically correct concurrency.

So although it's possible to reset a promise by assigning it to a fresh promise, doing so tends to raise broader synchronization issues.

Arminius answered 25/2, 2015 at 17:9 Comment(0)
M
3

A promise/future pair is designed to carry only a single value (or exception.). To do what you're describing, you probably want to adopt a different tool.

If you wish to have multiple threads (your readers) all stop at a common point, you might consider a barrier.

Macaw answered 1/7, 2011 at 4:53 Comment(0)
T
1

The following code demonstrates how the producer/consumer pattern can be implemented with future and promise.

There are two promise variables, used by a producer and a consumer thread. Each thread resets one of the two promise variables and waits for the other one.

#include <iostream>
#include <future>
#include <thread>
using namespace std;

// produces integers from 0 to 99
void producer(promise<int>& dataready, promise<void>& consumed)
{
    for (int i = 0; i < 100; ++i) {
        // do some work here ...
        consumed = promise<void>{};      // reset
        dataready.set_value(i);          // make data available
        consumed.get_future().wait();    // wait for the data to be consumed
    
    }
    dataready.set_value(-1);                     // no more data
}

// consumes integers
void consumer(promise<int>& dataready, promise<void>& consumed)
{
    for (;;) {
        int n = dataready.get_future().get();    // wait for data ready
        if (n >= 0) {
            std::cout << n << ",";
            dataready = promise<int>{};  // reset
            consumed.set_value();        // mark data as consumed
            // do some work here ...
        }
        else
            break;
    }
}

int main(int argc, const char*argv[])
{
    promise<int> dataready{};
    promise<void> consumed{};

    thread th1([&] {producer(dataready, consumed); });
    thread th2([&] {consumer(dataready, consumed); });

    th1.join();
    th2.join();
    std::cout  << "\n";

    return 0;
}
Thymus answered 24/1, 2022 at 17:31 Comment(0)
L
0

This answer is in support of, along with a tiny bit of clarification, the answers given by Jeff Wofford, Managu and Fabio.

Jeff's and Fabio's solutions work. But just to make it clear, what is happening in the solutions is that the so called reset of the promise is not resetting the promise per say, but just move assigning a new promise into the space allocation made for the promise. In Fabio's solution this allocation is made on the stack in main, and in Jeff's solution it is a global variable. In each iteration, the new promise is a completely different promise, and has a new shared state and any future retrieved from the previous promise is not referring to this new shared state.

So these solutions are not exactly resetting, but recycling the promise. New shared state allocation and de-allocation is happening in each iteration.

As mentioned by Managu,

A promise/future pair is designed to carry only a single value (or exception.).

It serves as a single shot asynchronous data communication mechanism.

Levant answered 12/6 at 12:50 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.