Consider the following example code where thread A pushes functions on a queue and thread B executes those when popping from the queue:
std::atomic<uint32_t> itemCount;
//Executed by thread A
void run(std::function<void()> function) {
if (queue.push(std::move(function))) {
itemCount.fetch_add(1, std::memory_order_acq_rel);
itemCount.notify_one();
}
}
//Executed by thread B
void threadMain(){
std::function<void()> function;
while(true){
if (queue.pop(function)) {
itemCount.fetch_sub(1, std::memory_order_acq_rel);
function();
}else{
itemCount.wait(0, std::memory_order_acquire);
}
}
}
where queue
is a concurrent queue which has a push
and a pop
function, each returning a bool
indicating whether the given operation was successful. So push
returns false
if it's full and pop
returns false
if it's empty.
Now I was wondering if the code is thread-safe under all circumstances.
Let's suppose thread B's pop
fails and is about to invoke std::atomic<T>::wait
. At the same time, thread A pushes a new element while thread B checks the initial wait condition. Since itemCount
hasn't changed yet, it fails.
Immediately after that, thread A increments the counter and tries to notify one waiting thread (although thread B doesn't wait internally yet). Thread B finally waits on the atomic, causing the thread to never wake up again due to the lost signal despite there being an element in the queue. That only stops as soon as a new element is being pushed on the queue, notifying B to continue execution.
I wasn't able to reproduce this situation manually since the timing is close to impossible to get right.
Is this a serious concern or impossible to happen? What (preferably atomic) alternatives do exist in order to account for such rare situations?
EDIT: Just to mention, the queue is not blocking and only utilizes atomic operations.
The reason I'm asking is I don't understand how it's possible to implement an atomic wait
operation. Although the standard says the whole operation is atomic (consisting of a load + predicate check + wait), in the implementation I'm using std::atomic<T>::wait
is implemented roughly as follows:
void wait(const _TVal _Expected, const memory_order _Order = memory_order_seq_cst) const noexcept {
_Atomic_wait_direct(this, _Atomic_reinterpret_as<long>(_Expected), _Order);
}
where _Atomic_wait_direct
is defined as
template <class _Ty, class _Value_type>
void _Atomic_wait_direct(
const _Atomic_storage<_Ty>* const _This, _Value_type _Expected_bytes, const memory_order _Order) noexcept {
const auto _Storage_ptr = _STD addressof(_This->_Storage);
for (;;) {
const _Value_type _Observed_bytes = _Atomic_reinterpret_as<_Value_type>(_This->load(_Order));
if (_Expected_bytes != _Observed_bytes) {
return;
}
__std_atomic_wait_direct(_Storage_ptr, &_Expected_bytes, sizeof(_Value_type), _Atomic_wait_no_timeout);
}
}
We can clearly see that there is an atomic load with the specified memory order to check the state of the atomic itself. However, I don't see how the whole operation can be considered atomic since there is a comparision right before the call to __std_atomic_wait_direct
.
With condition variables, the predicate itself is secured by a mutex but how is the atomic itself secured here?
memory_order_acquire
andmemory_order_acq_rel
are the possible problem here. Without them, your described problem does not happen. With them, I can't tell you. And if you cannot prove your code correct without those flags, don't use those flags. – Eldwinmemory_order_seq_cst
safe in this case? – Heidatomic
has condition variable like apis the answer is this universe in C++20 Difference between std::atomic and std::condition_variable wait, notify_* methods – Omorstd::atomic<uint32_t>
as a counting semaphore. I'd just use anstd::counting_semaphore>
instead. I'd also build that into the queue itself, sopush
andpop
deal with it, and the outside world doesn't. As an aside, I'd also pass a timeout to thepop
, so it'll automatically wait up to som specified amount of time before failing. – Adorlmemory_order_seq_cst
, so I can show that your scenario cannot happen; I mean, read howwait
works. I cannot reliably reason aboutacquire
oracq_rel
, so I cannot say anything of note (I do not fully understand how they work). I claim that if you cannot see thatmemory_order_seq_cst
is safe, then maybe you shouldn't be using the more exotic memory model flags. – Eldwinmemory_order_seq_cst
works, but still looking at the implementation ofwait
I don't think it can guarantee it can't happen. Actually, this is not a matter of atomicity itself; as I said the first load in wait (the predicate) could receive 0, thenfetch_add
sets it to 1 so that still the notify could be issued before the wait to the OS. I might be horribly off, but I want to justify at least why I think memory ordering isn't the main issue. – Heidwait
is an atomic operation, and so isfetch_add
. They don't interleave, but execute in some global order. Ifwait
happens to execute first, then it would be followed byfetch_add
andnotify_all
, which would wake it up. Iffetch_add
happens to execute first, then by the timewait
is executed,itemCount.load() != 0
andwait
will return immediately. You think ofwait
as executing in steps - check the value of the atomic, then enter the wait - but that's as wrong as thinking thatfetch_add
fetches first then adds, and worrying that another operation would squeeze in between. – Allyx.fetch_add(1, memory_order_seq_cst)
followed by ay.load(memory_order_relaxed)
, the load can be reordered in between the load and store of thefetch_add()
. Thex.fetch_add()
is atomic in the sense that no stores tox
can come between its load and store, and that other observers will not see intermediate values forx
, but AFAIK the atomicity isn't related to its ordering. – DiscerningitemCount.notify_one()
could also be reordered in between the load and store ofitemCount.fetch_add()
, which would appear to cause this algorithm to deadlock. – Discerningqueue.pop
returnstrue
you don't need thatitemCount
at all. – Hautegaronnestd::atomic
default to a short spin, then revert to something less-busy (e.g. libstdc++ from gcc). Of course there is no guarantee but I prefer a solution that at least allows non-spinning too. – Heidpop
for a non-blocking queue. – Hautegaronne