I have a total n00b question here on synchronization. I have a 'writer' thread which assigns a different value 'p' to a promise at each iteration. I need 'reader' threads which wait for shared_futures of this value and then process them, and my question is how do I use future/promise to ensure that the reader threads wait for a new update of 'p' before performing their processing task at each iteration? Many thanks.
You can "reset" a promise by assigning it to a blank promise.
myPromise = promise< int >();
A more complete example:
promise< int > myPromise;
void writer()
{
for( int i = 0; i < 10; ++i )
{
cout << "Setting promise.\n";
myPromise.set_value( i );
myPromise = promise< int >{}; // Reset the promise.
cout << "Waiting to set again...\n";
this_thread::sleep_for( chrono::seconds( 1 ));
}
}
void reader()
{
int result;
do
{
auto myFuture = myPromise.get_future();
cout << "Waiting to receive result...\n";
result = myFuture.get();
cout << "Received " << result << ".\n";
} while( result < 9 );
}
int main()
{
std::thread write( writer );
std::thread read( reader );
write.join();
read.join();
return 0;
}
A problem with this approach, however, is that synchronization between the two threads can cause the writer to call promise::set_value()
more than once between the reader's calls to future::get()
, or future::get()
to be called while the promise is being reset. These problems can be avoided with care (e.g. with proper sleeping between calls), but this takes us into the realm of hacking and guesswork rather than logically correct concurrency.
So although it's possible to reset a promise by assigning it to a fresh promise, doing so tends to raise broader synchronization issues.
A promise
/future
pair is designed to carry only a single value (or exception.). To do what you're describing, you probably want to adopt a different tool.
If you wish to have multiple threads (your readers) all stop at a common point, you might consider a barrier
.
The following code demonstrates how the producer/consumer pattern can be implemented with future
and promise
.
There are two promise
variables, used by a producer and a consumer thread. Each thread resets one of the two promise
variables and waits for the other one.
#include <iostream>
#include <future>
#include <thread>
using namespace std;
// produces integers from 0 to 99
void producer(promise<int>& dataready, promise<void>& consumed)
{
for (int i = 0; i < 100; ++i) {
// do some work here ...
consumed = promise<void>{}; // reset
dataready.set_value(i); // make data available
consumed.get_future().wait(); // wait for the data to be consumed
}
dataready.set_value(-1); // no more data
}
// consumes integers
void consumer(promise<int>& dataready, promise<void>& consumed)
{
for (;;) {
int n = dataready.get_future().get(); // wait for data ready
if (n >= 0) {
std::cout << n << ",";
dataready = promise<int>{}; // reset
consumed.set_value(); // mark data as consumed
// do some work here ...
}
else
break;
}
}
int main(int argc, const char*argv[])
{
promise<int> dataready{};
promise<void> consumed{};
thread th1([&] {producer(dataready, consumed); });
thread th2([&] {consumer(dataready, consumed); });
th1.join();
th2.join();
std::cout << "\n";
return 0;
}
This answer is in support of, along with a tiny bit of clarification, the answers given by Jeff Wofford, Managu and Fabio.
Jeff's and Fabio's solutions work. But just to make it clear, what is happening in the solutions is that the so called reset of the promise is not resetting the promise per say, but just move assigning a new promise into the space allocation made for the promise. In Fabio's solution this allocation is made on the stack in main, and in Jeff's solution it is a global variable. In each iteration, the new promise is a completely different promise, and has a new shared state and any future retrieved from the previous promise is not referring to this new shared state.
So these solutions are not exactly resetting, but recycling the promise. New shared state allocation and de-allocation is happening in each iteration.
As mentioned by Managu,
A
promise
/future
pair is designed to carry only a single value (or exception.).
It serves as a single shot asynchronous data communication mechanism.
© 2022 - 2024 — McMap. All rights reserved.