C++20 semaphore in queue application seems slow compared to condition variable
Asked Answered
C

1

6

For study purposes, I’m comparing implementations of single producer single consumer queues. So I compared a condition variable implementation with a C++20 counting semaphore implementation. I would have guessed that the semaphore implementation would be faster, but that is not the case. Under Windows, MSVC, on my computer, the semaphore implementation is about 25% slower. I’ve included both implementations below.

The condition variable implementation has a small functional advantage: aborting operations can be achieved with the done() API function, while the semaphore implementation requires a special ‘stop’ value to be queued to unlock and exit the pulling thread. In my imagination, a single producer single consumer queue was a typical application for semaphores, but apparently not.

Now I wonder:

  • Did I do something not clever so that my semaphore implementation is needlessly slow?
  • Is possibly the Microsoft counting semaphore implementation too slow?
  • Or do requirements in the C++ standard make the semaphore slow in general?
  • Am I just mistaken that a queue is proper application for semaphores?
  • When a queue is not a proper application, for what other application does the semaphore outperform the condition variable?

Condition variable implementation:

#include <array>
#include <mutex>
#include <condition_variable>

/*
* locked_single_producer_single_consumer_queue_T is responsible for locked packet communication
* between 2 threads. One thread pushes, the other thread pulls.
*/
template<class T, int N = 16> // N must be a power 2
class locked_single_producer_single_consumer_queue_T
{
public:
    /* When packet fits in the queue, then push shall return immediatelly. Otherwise it will block until it can push the packet. */
    void push(T const& packet)
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_cv.wait(lock, [this] {return ((m_tail - m_head) & m_mask) != 1; });
        m_data[m_head++] = packet;
        m_head &= m_mask;
        lock.unlock();
        m_cv.notify_one();
    }
    /* When packet could be retreived from the queue, then pull shall return immediatelly. Otherwise it will block until it can pull the packet. */
    bool pull(T& packet)
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_cv.wait(lock, [this] {return (((m_head - m_tail) & m_mask) != 0) || m_done; });
        if(((m_head - m_tail) & m_mask) != 0) [[likely]]
        {
            packet = m_data[m_tail++];
            m_tail &= m_mask;
            lock.unlock();
            m_cv.notify_one();
            return true;
        }
        return false;
    }
    /* done() indicates that the pushing thread stopped. The pulling thread can continue reading
       the remainder of the queue and should then return */
    void done()
    {
        {
            std::lock_guard<std::mutex> lock(m_mutex);
            m_done = true;
        }
        m_cv.notify_one();
    }
private:
    static_assert((N& (N - 1)) == 0, "N must be a power of 2");
    static signed int const m_mask = N - 1;
    using data_t = std::array<T, N>;
    data_t m_data;
    std::mutex m_mutex;
    std::condition_variable m_cv;
    int m_tail{ 0 };
    int m_head{ 0 };
    bool m_done{};
};

Semaphore implementation:

#include <array>
#include <semaphore>
#include <atomic>

/*
* locked_single_producer_single_consumer_queue2_T is responsible for locking packet communication
* between 2 threads. One thread pushes, the other thread pulls.
*/
template<class T, int N = 16> // N must be a power 2
class locked_single_producer_single_consumer_queue2_T
{
public:
    /* When packet fits in the queue, then push shall return immediatelly. Otherwise it will block until it can push the packet. */
    void push(T const& packet)
    {
        m_available_space.acquire();
        int head = m_head.load(std::memory_order_acquire);
        m_data[head++ & m_mask] = packet;
        m_head.store(head, std::memory_order_release);
        m_available_packages.release();
    }
    /* When packet could be retreived from the queue, then pull shall return immediatelly. Otherwise it will block until it can pull the packet. */
    T pull()
    {
        m_available_packages.acquire();
        int tail = m_tail.load(std::memory_order_acquire);
        T packet = m_data[tail++ & m_mask];
        m_tail.store(tail, std::memory_order_release);
        m_available_space.release();
        return packet;
    }
private:
    static_assert((N& (N - 1)) == 0, "N must be a power of 2");
    static signed int const m_mask = N - 1;
    using data_t = std::array<T, N>;
    data_t m_data;
    std::atomic_int m_tail{ 0 };
    std::atomic_int m_head{ 0 };
    std::counting_semaphore<N> m_available_space{ N };
    std::counting_semaphore<N> m_available_packages{ 0 };
};

*** EDIT ***

Upon request, I've also included a complete test program. It already includes both implementations. (It needs C++20 with semaphores)

#include <array>
#include <mutex>
#include <condition_variable>
#include <semaphore>
#include <atomic>
#include <iostream>
#include <vector>
#include <algorithm>
#include <future>

/*
* locked_single_producer_single_consumer_queue_T is responsible for locked packet communication
* between 2 threads. One thread pushes, the other thread pulls.
*/
template<class T, int N = 16> // N must be a power 2
class locked_single_producer_single_consumer_queue_T
{
public:
    /* When packet fits in the queue, then push shall return immediatelly. Otherwise it will block until it can push the packet. */
    void push(T const& packet)
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_cv.wait(lock, [this] {return ((m_tail - m_head) & m_mask) != 1; });
        m_data[m_head++] = packet;
        m_head &= m_mask;
        lock.unlock();
        m_cv.notify_one();
    }
    /* When packet could be retreived from the queue, then pull shall return immediatelly. Otherwise it will block until it can pull the packet. */
    bool pull(T& packet)
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_cv.wait(lock, [this] {return (((m_head - m_tail) & m_mask) != 0) || m_done; });
        if (((m_head - m_tail) & m_mask) != 0) [[likely]]
        {
            packet = m_data[m_tail++];
            m_tail &= m_mask;
            lock.unlock();
            m_cv.notify_one();
            return true;
        }
        return false;
    }
    /* done() indicates that the pushing thread stopped. The pulling thread can continue reading
       the remainder of the queue and should then return */
    void done()
    {
        {
            std::lock_guard<std::mutex> lock(m_mutex);
            m_done = true;
        }
        m_cv.notify_one();
    }
private:
    static_assert((N& (N - 1)) == 0, "N must be a power of 2");
    static signed int const m_mask = N - 1;
    using data_t = std::array<T, N>;
    data_t m_data;
    std::mutex m_mutex;
    std::condition_variable m_cv;
    int m_tail{ 0 };
    int m_head{ 0 };
    bool m_done{};
};

/*
* locked_single_producer_single_consumer_queue2_T is responsible for locking packet communication
* between 2 threads. One thread pushes, the other thread pulls.
*/
template<class T, int N = 16> // N must be a power 2
class locked_single_producer_single_consumer_queue2_T
{
public:
    /* When packet fits in the queue, then push shall return immediatelly. Otherwise it will block until it can push the packet. */
    void push(T const& packet)
    {
        m_available_space.acquire();
        int head = m_head.load(std::memory_order_acquire);
        m_data[head++ & m_mask] = packet;
        m_head.store(head, std::memory_order_release);
        m_available_packages.release();
    }
    /* When packet could be retreived from the queue, then pull shall return immediatelly. Otherwise it will block until it can pull the packet. */
    T pull()
    {
        m_available_packages.acquire();
        int tail = m_tail.load(std::memory_order_acquire);
        T packet = m_data[tail++ & m_mask];
        m_tail.store(tail, std::memory_order_release);
        m_available_space.release();
        return packet;
    }
private:
    static_assert((N& (N - 1)) == 0, "N must be a power of 2");
    static signed int const m_mask = N - 1;
    using data_t = std::array<T, N>;
    data_t m_data;
    std::atomic_int m_tail{ 0 };
    std::atomic_int m_head{ 0 };
    std::counting_semaphore<N> m_available_space{ N };
    std::counting_semaphore<N> m_available_packages{ 0 };
};

/******************************************************************************************************/

using implementation_t = bool;
implementation_t const condition_variable = false;
implementation_t const semaphore = true;

/*
* pusher() is a thread function that is responsible for pushing a defined
* sequence of integers in the lock_free queue
*/
std::atomic_int sum_ref{};
template<class queue_t>
void pusher(std::atomic_bool& do_continue_token, queue_t& queue)
{
    int i = 0;
    while (do_continue_token.load(std::memory_order_acquire))
    {
        queue.push(i);
        sum_ref += i;
        ++i;
    }
}

/*
* puller() is a thread function that is responsible for pulling
* integers from the lock_free queue, and compare it with the
* expected sequence
*/
std::atomic_int sum_check{};
template<implementation_t implementation, class queue_t>
int puller(queue_t& queue)
{
    int i;
    if constexpr (implementation == condition_variable)
    {
        while (queue.pull(i))
        {
            sum_check += i;
        }
    }
    if constexpr (implementation == semaphore)
    {
        int j;
        while ((j = queue.pull()) != -1)
        {
            sum_check += j;
            i = j;
        }
    }
    return i;
}

/*
* test() is responsible for kicking off two threads that push and pull from
* the queue for a duration of 10s. Test returns the last integer value that was
* pulled from the queue as an indication of speed.
*/
template<implementation_t implementation, class queue_t>
int test()
{
    using namespace std::chrono_literals;
    std::atomic_bool do_continue_token(true);
    queue_t queue;
    std::cout << '<' << std::flush;
    std::future<void> fpusher = std::async(pusher<queue_t>, std::ref(do_continue_token), std::ref(queue));
    std::future<int> fpuller = std::async(puller<implementation, queue_t>, std::ref(queue));
    std::this_thread::sleep_for(10s);
    do_continue_token.store(false, std::memory_order_release);
    fpusher.wait();
    if constexpr (implementation == condition_variable)
    {
        queue.done(); // to stop the waiting thread
    }
    if constexpr (implementation == semaphore)
    {
        queue.push(-1); // to stop the waiting thread
    }
    int i = fpuller.get();
    if (sum_check != sum_ref)
    {
        throw;
    }
    std::cout << '>' << std::endl;
    return i;
}

/*
* main() is responsible for performing multiple tests of different implementations.
* Results are collected, ordered and printed.
*/
int main()
{
    struct result_t
    {
        std::string m_name;
        int m_count;
    };
    using condition_variable_queue_t = locked_single_producer_single_consumer_queue_T<int, 1024>;
    using semaphore_queue_t = locked_single_producer_single_consumer_queue2_T<int, 1024>;
    std::vector<result_t> results // 6 runs
    {
        { "condition_variable", test<condition_variable, condition_variable_queue_t>() },
        { "semaphore", test<semaphore, semaphore_queue_t>() },
        { "condition_variable", test<condition_variable, condition_variable_queue_t>() },
        { "semaphore", test<semaphore, semaphore_queue_t>() },
        { "condition_variable", test<condition_variable, condition_variable_queue_t>() },
        { "semaphore", test<semaphore, semaphore_queue_t>() },
    };
    std::sort(results.begin(), results.end(), [](result_t const& lhs, result_t const& rhs) { return lhs.m_count < rhs.m_count; });
    std::cout << "The higher the count, the faster the solution" << std::endl;
    for (result_t const& result : results)
    {
        std::cout << result.m_name << ": " << result.m_count << std::endl;
    }
}

Output of a run:

<>
<>
<>
<>
<>
<>
The higher the count, the faster the solution
semaphore: 58304215
semaphore: 59302013
semaphore: 61896024
condition_variable: 84140445
condition_variable: 87045903
condition_variable: 90893057
Cowage answered 8/12, 2020 at 14:47 Comment(4)
How are you comparing performance of your two programs? Without knowing this I doubt anyone can answer your question. My question is: why do you believe that two semaphores are faster than one condition variable?Haemophiliac
Your semaphor version is using atomic variables. That triggers a lot of cach coherence and sync. No wonder it is slower.Beanfeast
Moreover, Instead that smart tango with N, I would just compare it with counting_semaphore::max(); more readable, less risky.Beanfeast
After investigation on Win10-1809+VS2022, I found in the semaphore version the pusher syscalls to the kernel at almost each push, eating a lot of CPU cycles. The cv version uses SRWLock which does a 1024 spin before suspending itself to the kernel, which almost never happens because the pusher usually releases the lock while the puller is spinning, which saves the syscall for both ends. You would think you can write your own spin loop using semaphore::try_acquire, but it's not usable because the MSVC implementation might fail the try_acquire even the internal counter is non-0. (CAS contention)Trahan
C
2

My question kept bothering me, so I investigated Microsoft’s current implementation of semaphores. The counting semaphore has two atomics, and to implements the blocking wait with a wait on one of the atomics. Note that when the semaphore count does not reach zero, then also the wait for atomic is not called. The implementation also only notifies (the atomic) when it is sure that at least one thread is waiting for it. But still the semaphore implementation depends on the new C++20 wait/notify functions.

The new C++20 wait/notify functions are implemented with a pool of condition variables. I guess that is optimal, at least I wouldn’t know another faster way.

Bottom-line this implementation of semaphore is based on condition variables, and then I can imagine that above mentioned “condition variable implementation” is faster. Assuming that the mutex is most of the time not locked, then getting the mutex is cheap. Assuming that (due to the large queue size of 1024) we almost never have to wait for the condition variable predicate, also m_cv.wait() is cheap.

The “semaphore implementation” is in effect almost the same, only now two atomics (m_head & m_tail) need to be read and written. In the “condition variable implementation” the mutex implicitly protected these variables. Then my conclusion is that these two atomics in the “semaphore implementation” make the difference. And, unfortunately, you cannot do without them (in the “semaphore implementation”), so the “condition variable implementation” is faster.

To answer the question:

Q: Did I do something not clever so that my semaphore implementation is needlessly slow?

A: Not that I know (yet)

Q: Is possibly the Microsoft counting semaphore implementation too slow?

A: Does not look like it

Q: Or do requirements in the C++ standard make the semaphore slow in general?

A: Again, does not look like it.

Q: Am I just mistaken that a queue is proper application for semaphores?

A: Yes, that was probably in the early days

Q: When a queue is not a proper application, for what other application does the semaphore outperform the condition variable?

A: Don’t know yet. Possibly an application with simple waiting for limited resources.

Cowage answered 17/12, 2020 at 14:42 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.