C++ Equivalent to Java's BlockingQueue
Asked Answered
B

4

52

I'm in the process of porting some Java code over to C++, and one particular section makes use of a BlockingQueue to pass messages from many producers to a single consumer.

If you are not familiar with what a Java BlockingQueue is, it is just a queue that has a hard capacity, which exposes thread safe methods to put() and take() from the queue. put() blocks if the queue is full, and take() blocks if the queue is empty. Also, timeout-sensitive versions of these methods are supplied.

Timeouts are relevant to my use-case, so a recommendation that supplies those is ideal. If not, I can code up some myself.

I've googled around and quickly browsed the Boost libraries and I'm not finding anything like this. Maybe I'm blind here...but does anyone know of a good recommendation?

Thanks!

Bounden answered 9/10, 2012 at 17:13 Comment(3)
hand made class that has an array(maybe deque instead of array for easier pop_front push_back) and mutex?Blimey
is hard capacity really a requirement?Uncouple
In my case, yes. Its very possible that producers outpace consumers, and I have a need to either block threads on the producer side, or otherwise reject their input, lest I run out of memory!Bounden
P
72

It isn't fixed size and it doesn't support timeouts but here is a simple implementation of a queue I had posted recently using C++ 2011 constructs:

#include <mutex>
#include <condition_variable>
#include <deque>

template <typename T>
class queue
{
private:
    std::mutex              d_mutex;
    std::condition_variable d_condition;
    std::deque<T>           d_queue;
public:
    void push(T const& value) {
        {
            std::unique_lock<std::mutex> lock(this->d_mutex);
            d_queue.push_front(value);
        }
        this->d_condition.notify_one();
    }
    T pop() {
        std::unique_lock<std::mutex> lock(this->d_mutex);
        this->d_condition.wait(lock, [=]{ return !this->d_queue.empty(); });
        T rc(std::move(this->d_queue.back()));
        this->d_queue.pop_back();
        return rc;
    }
};

It should be trivial to extend and use a timed wait for popping. The main reason I haven't done it is that I'm not happy with the interface choices I have thought of so far.

Panier answered 9/10, 2012 at 17:56 Comment(12)
is the scoping in push neccessary ? I guess you are trying to unlock the mutex... but Im not sure of the prerequirements for notify_one.Blimey
The scope in push() isn't necessary but without it the condition variable is signaled while the lock is still held. Releasing the lock prior to signaling makes the lock readily available.Ferraro
could someone extent this example for timed wait for popping?Osiris
here I posted similar code for review, probably some comments may be helpful codereview.stackexchange.com/questions/49820/thread-safe-queueOsiris
how many consumers and producers this implementations supports?Osiris
@javapowered: given that there are locks being used it doesn't really care! There is probably a limit on threads which can wait on a lock but I can't imagine that this would relevant.Ferraro
Is it a matter of style to use d_ or m_ as prefix for member variables or there are some semantics in this choice?Valorie
@Isaac: It is a matter of style. In different organizations different indicators for member variables are used. In different organizations I have used nothing, a m_-prefix, a d_-prefix, a _-suffix, and an [ill-advised] _-prefix. Currently, I'm working in an organization where a d_-prefix is used.Ferraro
Would it make sense to use condition_variable::wait_for instead of condition_variable::wait ?Priestcraft
@nurettin: depends on your needs: if you want/need to be able to bail out after some time, eg., to shutdown the system, wait_for() together with handling of the timeout may be reasonable. Normally I’d enqueue function objects and I would shutdown via a corresponding function object being enqueued, ie., there wouldn’t be a need for dealing with timeouts.Ferraro
a small question regarding the lock, lets say the pop method is blocked at wait(), it means the mutex is locked, so no thread will be able to push into the queue so we are deadlocked. am i missing something?Grilled
@Nitzanu: yes, you are missing that the defining feature of a condition variable is that it releases the lock and puts the thread to sleep without loosing a signal. I particular the lock will be released by wait(). This is the reason why a unique_lock is used rather then a lock_guard as the latter doesn’t have an interface to release the lock without destruction. When wait() returns it will reacquire the lock. It is important to realize that the lock is released during the wait() as anything else the lock protects may also change.Ferraro
D
6

Here's an example of a blocking queue with shutdown request feature:

template <typename T> class BlockingQueue {
  std::condition_variable _cvCanPop;
  std::mutex _sync;
  std::queue<T> _qu;
  bool _bShutdown = false;

public:
  void Push(const T& item)
  {
    {
      std::unique_lock<std::mutex> lock(_sync);
      _qu.push(item);
    }
    _cvCanPop.notify_one();
  }

  void RequestShutdown() {
    {
      std::unique_lock<std::mutex> lock(_sync);
      _bShutdown = true;
    }
    _cvCanPop.notify_all();
  }

  bool Pop(T &item) {
    std::unique_lock<std::mutex> lock(_sync);
    for (;;) {
      if (_qu.empty()) {
        if (_bShutdown) {
          return false;
        }
      }
      else {
        break;
      }
      _cvCanPop.wait(lock);
    }
    item = std::move(_qu.front());
    _qu.pop();
    return true;
  }
};
Dalpe answered 1/9, 2018 at 12:27 Comment(0)
S
1

U should write the class of semephore first

#ifndef SEMEPHORE_H
#define SEMEPHORE_H
#include <mutex>
#include <condition_variable>

class semephore {
public:
    semephore(int count = 0)
        : count(count),
          m(),
          cv()
    {

    }

    void await() {
        std::unique_lock<std::mutex> lk(m);
        --count;
        if (count < 0) {
            cv.wait(lk);
        }
    }

    void post() {
        std::unique_lock<std::mutex> lk(m);
        ++count;
        if (count <= 0) {
            cv.notify_all();
        }
    }
    
private:
    int count;
    std::mutex m;
    std::condition_variable cv;
};

#endif // SEMEPHORE_H

now the blocked_queue can use the semephore to deal with it

#ifndef BLOCKED_QUEUE_H
#define BLOCKED_QUEUE_H
#include <list>
#include "semephore.h"

template <typename T>
class blocked_queue {
public:
    blocked_queue(int count) 
        : s_products(),
          s_free_space(count),
          li()
    {

    }

    void put(const T &t) {
        s_free_space.await();
        li.push_back(t);
        s_products.post();
    }

    T take() {
        s_products.await();
        T res = li.front();
        li.pop_front();
        s_free_space.post();
        return res;
    }
private:
    semephore s_products;
    semephore s_free_space;
    std::list<T> li;
};

#endif // BLOCKED_QUEUE_H

Semester answered 28/11, 2021 at 7:15 Comment(0)
P
0

OK I'm a bit late to the party but I think this is a better fit for the Java's BlockingQueue implementation. Here I too use one mutex and two conditions to look after not full and not empty. IMO a BlockingQueue makes more sense with limited capacity which I didn't see in the other answers. I include a simple test scenario too:

#include <iostream>
#include <algorithm>
#include <queue>
#include <mutex>
#include <thread>
#include <condition_variable>

template<typename T>
class blocking_queue {
private:
    size_t _capacity;
    std::queue<T> _queue;
    std::mutex _mutex;
    std::condition_variable _not_full;
    std::condition_variable _not_empty;

public:
    inline blocking_queue(size_t capacity) : _capacity(capacity) {
        // empty
    }

    inline size_t size() const {
        std::unique_lock<std::mutex> lock(_mutex);
        return _queue.size();
    }

    inline bool empty() const {
        std::unique_lock<std::mutex> lock(_mutex);
        return _queue.empty();
    }

    inline void push(const T& elem) {
        {
            std::unique_lock<std::mutex> lock(_mutex);

            // wait while the queue is full
            while (_queue.size() >= _capacity) {
                _not_full.wait(lock);
            }
            std::cout << "pushing element " << elem << std::endl;
            _queue.push(elem);
        }
        _not_empty.notify_all();
    }

    inline void pop() {
        {
            std::unique_lock<std::mutex> lock(_mutex);

            // wait while the queue is empty
            while (_queue.size() == 0) {
                _not_empty.wait(lock);
            }
            std::cout << "popping element " << _queue.front() << std::endl;
            _queue.pop();
        }
        _not_full.notify_one();
    }

    inline const T& front() {
        std::unique_lock<std::mutex> lock(_mutex);

        // wait while the queue is empty
        while (_queue.size() == 0) {
            _not_empty.wait(lock);
        }
        return _queue.front();
    }
};

int main() {
    blocking_queue<int> queue(5);

    // create producers
    std::vector<std::thread> producers;
    for (int i = 0; i < 10; i++) {
        producers.push_back(std::thread([&queue, i]() {
            queue.push(i);
            // produces too fast
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }));
    }

    // create consumers
    std::vector<std::thread> consumers;
    for (int i = 0; i < 10; i++) {
        producers.push_back(std::thread([&queue, i]() {
            queue.pop();
            // consumes too slowly
            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        }));
    }

    std::for_each(producers.begin(), producers.end(), [](std::thread &thread) {
        thread.join();
    });

    std::for_each(consumers.begin(), consumers.end(), [](std::thread &thread) {
        thread.join();
    });

    return EXIT_SUCCESS;
}
Prothalamium answered 27/8, 2019 at 21:39 Comment(3)
Even though the STL queue works like this as well (you have to use front() to access the first element and then pop() it afterwards), I don't think this kind of interface works well in a multithreaded scenario, since there's no way for you to pop the first element and get its value in one operation. If you do front() then pop(), you may remove a different element then the one you just obtained. Or am I missing something here?Zlatoust
Very true, it could be easily extended to do a pop2 or frontAndPop which will fuse these into one and then cover this use-case where atomic thread-safe access is necessary instead of a two step approach.Prothalamium
for the size() and empty() methods, do you really need to obtain a lock ? it seems that you will deadlock in this caseInsociable

© 2022 - 2024 — McMap. All rights reserved.