C++20: How to wait on an atomic object with timeout?
Asked Answered
C

1

13

C++20 std::atomic has wait and notify_* member functions, but no wait_for/wait_until.

The Microsoft STL implementation for std::atomic uses WaitOnAddress (when the OS is new enough to has it). And this API has a dwMilliseconds parameter just as timeout value. So from a standard library writer's standpoint, I think the missing functions are easily implementable (at least on Windows 8 or newer). I just wonder why it's not in C++20.

But as a (portable) user-code writer, I have to emulate the behavior with a standard semaphore and an atomic counter. So here's the code:

#include <concepts>
#include <atomic>
#include <type_traits>
#include <cstring>
#include <semaphore>

namespace detail
{
    template <size_t N>
    struct bytes
    {
        unsigned char space[N];
        auto operator<=>(bytes const &) const = default;
    };

    //Compare by value representation, as requested by C++20.
    //The implementation is a bit awkward.
    //Hypothetically `std::atomic<T>::compare(T, T)` would be helpful. :)
    template <std::integral T>
    bool compare(T a, T b) noexcept
    {
        static_assert(std::has_unique_object_representations_v<T>);
        return a == b;
    }
    template <typename T>
    requires(std::has_unique_object_representations_v<T> && !std::integral<T>)
    bool compare(T a, T b) noexcept
    {
        bytes<sizeof(T)> aa, bb;
        std::memcpy(aa.space, &a, sizeof(T));
        std::memcpy(bb.space, &b, sizeof(T));
        return aa == bb;
    }
    template <typename T>
    requires(!std::has_unique_object_representations_v<T>)
    bool compare(T a, T b) noexcept
    {
        std::atomic<T> aa{ a };
        auto equal = aa.compare_exchange_strong(b, b, std::memory_order_relaxed);
        return equal;
    }

    template <typename T>
    class atomic_with_timed_wait
        : public std::atomic<T>
    {
    private:
        using base_atomic = std::atomic<T>;
        std::counting_semaphore<> mutable semaph{ 0 };
        std::atomic<std::ptrdiff_t> mutable notify_demand{ 0 };
    public:
        using base_atomic::base_atomic;
    public:
        void notify_one() /*noexcept*/
        {
            auto nd = notify_demand.load(std::memory_order_relaxed);
            if (nd <= 0)
                return;
            notify_demand.fetch_sub(1, std::memory_order_relaxed);
            semaph.release(1);//may throw
        }
        void notify_all() /*noexcept*/
        {
            auto nd = notify_demand.exchange(0, std::memory_order_relaxed);
            if (nd > 0)
            {
                semaph.release(nd);//may throw
            }
            else if (nd < 0)
            {
                //Overly released. Put it back.
                notify_demand.fetch_add(nd, std::memory_order_relaxed);
            }
        }
        void wait(T old, std::memory_order order = std::memory_order::seq_cst) const /*noexcept*/
        {
            for (;;)
            {
                T const observed = base_atomic::load(order);
                if (false == compare(old, observed))
                    return;

                notify_demand.fetch_add(1, std::memory_order_relaxed);

                semaph.acquire();//may throw
                //Acquired.
            }
        }
        template <typename TPoint>
        bool wait_until(int old, TPoint const & abs_time, std::memory_order order = std::memory_order::seq_cst) const /*noexcept*/
        //Returns: true->diff; false->timeout
        {
            for (;;)
            {
                T const observed = base_atomic::load(order);
                if (false == compare(old, observed))
                    return true;

                notify_demand.fetch_add(1, std::memory_order_relaxed);

                if (semaph.try_acquire_until(abs_time))//may throw
                {
                    //Acquired.
                    continue;
                }
                else
                {
                    //Not acquired and timeout.
                    //This might happen even if semaph has positive release counter.
                    //Just cancel demand and return.
                    //Note that this might give notify_demand a negative value,
                    //  which means the semaph is overly released.
                    //Subsequent acquire on semaph would just succeed spuriously.
                    //So it should be OK.
                    notify_demand.fetch_sub(1, std::memory_order_relaxed);
                    return false;
                }
            }
        }
        //TODO: bool wait_for()...
    };
}
using detail::atomic_with_timed_wait;

I am just not sure whether it's correct. So, is there any problem in this code?

Corinecorinna answered 21/10, 2021 at 10:20 Comment(0)
P
2

Timed waiting APIs (try_wait, wait_for, and wait_until) for std::atomic are proposed in P2643, targeting C++26. libstdc++ has already implemented the underlying support for these operations in its internal header <bits/atomic_timed_wait.h>. Note that these facilities are also used to implement timed waits for std::counting_semaphore, which is essentially a more constrained std::atomic. Before the paper is merged, there are at least two portable ways to emulate timed operations:

  1. A pair of mutex and condition variable: These two can be combined to provide universal timed waiting functionality. For example, std::condition_variable_any could be implemented using a pair of std::mutex and std::condition_variable (N2406). A pair of std::atomic and std::counting_semaphore, as in your code, may also be feasible, but I found it a bit awkward since std::counting_semaphore doesn't have a notify_all operation, which introduces extra complexity. A straightforward prototype could be as follows (Godbolt):

    // NOTE: volatile overloads are not supported
    template <class T> struct timed_atomic : atomic<T> {
      using atomic<T>::atomic;
      bool try_wait(T old, memory_order order = seq_cst) const noexcept {
        T value = this->load(order);
        // TODO: Ignore padding bits in comparison
        return memcmp(addressof(value), addressof(old), sizeof(T));
      }
      void wait(T old, memory_order order = seq_cst) const {
        unique_lock lock(mtx);
        cond.wait(lock, [=, this]() { return try_wait(old, relaxed); });
      }
      template <class Rep, class Period>
      bool wait_for(T old, const duration<Rep, Period> &rel_time,
                    memory_order order = seq_cst) const {
        unique_lock lock(mtx);
        return cond.wait_for(lock, rel_time,
                             [=, this]() { return try_wait(old, relaxed); });
      }
      template <class Clock, class Duration>
      bool wait_until(T old, const time_point<Clock, Duration> &abs_time,
                      memory_order order = seq_cst) const {
        unique_lock lock(mtx);
        return cond.wait_until(lock, abs_time,
                               [=, this]() { return try_wait(old, relaxed); });
      }
      void notify_one() const {
        { lock_guard _(mtx); }
        cond.notify_one();
      }
      void notify_all() const {
        { lock_guard _(mtx); }
        cond.notify_all();
      }
    private:
      mutable mutex mtx;
      mutable condition_variable cond;
      using enum memory_order;
    };
    

    As you can see above, one downside of this approach is that volatile overloads of member functions are not supported since std::mutex and std::condition_variable themselves don't support volatile. One workaround is to store them in a separate table outside the timed_atomic and hash addresses to get the corresponding pairs. libstdc++ has implemented something similar when the platform doesn't support native atomic waiting operations (Thomas).

    A more subtle problem is that the standard requires wait to compare value representations (i.e., excluding padding bits) for equality instead of object representations ([atomics.types.operations] p30.1). For now, this can't be easily implemented in a portable way and needs compiler support (e.g., __builtin_clear_padding in GCC).

  2. Polling with timed backoff: This approach is more lightweight as it doesn't require additional synchronization facilities. The downside is that polling is usually more expensive than waiting when the notification takes a long time to arrive. One potential advantage of polling is that it honors adjustments to the user-provided Clock. An example implementation is as follows (Godbolt):

    template <class T> struct timed_atomic : atomic<T> {
      using atomic<T>::atomic;
      bool try_wait(T old, memory_order order = seq_cst) const noexcept {
        T value = this->load(order);
        // TODO: Ignore padding bits in comparison
        return memcmp(addressof(value), addressof(old), sizeof(T));
      }
      template <class Rep, class Period>
      bool wait_for(T old, const duration<Rep, Period> &rel_time,
                    memory_order order = seq_cst) const {
        return wait_until(old, steady_clock::now() + rel_time, order);
      }
      template <class Clock, class Duration>
      bool wait_until(T old, const time_point<Clock, Duration> &abs_time,
                      memory_order order = seq_cst) const {
        while (!try_wait(old, order)) {
          if (Clock::now() >= abs_time)
            return false;
          sleep_for(100ms);
        }
        return true;
      }
      // NOTE: volatile overloads are omitted
    private:
      using enum memory_order;
    };
    
Patrizia answered 27/5 at 12:32 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.