consumer/producer in c++
Asked Answered
K

5

5

This is a classic c/p problem where some threads produce data while other read the data. Both the producer and consumers are sharing a const sized buffer. If the buffer is empty then the consumers have to wait and if it is full then the producer has to wait. I am using semaphores to keep track of full or empty queues. The producer is going to decrement free spots semaphore, add value, and increment filled slots semaphore. So I am trying to implement a program that gets some numbers from the generator function, and then prints out the average of the numbers. By treating this as a producer-consumer problem, I am trying to save some time in the execution of the program. The generateNumber function causes some delay in the process so I want to create a number of threads that generate numbers, and put them into a queue. Then the "main thread" which is running the main function has to read from the queue and find the sum and then average. So here is what I have so far:

#include <cstdio> 
#include <cstdlib>
#include <time.h>
#include "Thread.h" 
#include <queue> 

int generateNumber() {
    int delayms = rand() / (float) RAND_MAX * 400.f + 200;
    int result = rand() / (float) RAND_MAX * 20;
    struct timespec ts;
    ts.tv_sec = 0;
    ts.tv_nsec = delayms * 1000000;
    nanosleep(&ts, NULL);
    return result; }


struct threadarg {
    Semaphore filled(0);
    Semaphore empty(n);
    std::queue<int> q; };


void* threadfunc(void *arg) {
    threadarg *targp = (threadarg *) arg;
    threadarg &targ = *targp;
    while (targ.empty.value() != 0) {
        int val = generateNumber();
        targ.empty.dec(); 
        q.push_back(val);
        targ.filled.inc(); }
}
int main(int argc, char **argv) {
    Thread consumer, producer;
    // read the command line arguments
    if (argc != 2) {
        printf("usage: %s [nums to average]\n", argv[0]);
        exit(1); }
    int n = atoi(argv[1]);
    // Seed random number generator
    srand(time(NULL));
}

I am a bit confused now because I am not sure how to create multiple producer threads that are generating numbers (if q is not full) while the consumer is reading from the queue (that is if q is not empty). I am not sure what to put in the main to implment it. also in "Thread.h", you can create a thread, a mutex, or a semaphore. The thread has the methods .run(threadFunc, arg), .join(), etc. A mutex can be locked or unlocked. The semaphore methods have all been used in my code.

Kazachok answered 22/2, 2012 at 14:0 Comment(4)
Hi Dan, you didn't accept any of the answers that were given to you. Please give some incentive to the community to answer your questions.Punchball
I am so sorry I didn't even realize that was an option until now! I accepted the answers to all the questions I had previously asked.Kazachok
So thanks for the responses. However, it is not so much the code I am struggling with, I am just not sure where to define what especially with the consumers.Kazachok
In your posted code, you have one threadfunc, and it's the producer: just rename it producer and write another function called consumer. The producer is already pushing its products, so that's ok - you just need to bind it to Thread producer so it actually runs. If you want multiple producers, run the same function in multiple Thread objects. The consumer function needs to pop in a loop and do it's calculation. Lastly, you need to figure out how the producers and consumers know when to stop!Amersfoort
A
9

Your queue is not synchronized, so multiple producers could call push_back at the same time, or at the same time the consumer is calling pop_front ... this will break.

The simple approach to making this work is to use a thread-safe queue, which can be a wrapper around the std::queue you already have, plus a mutex.

You can start by adding a mutex, and locking/unlocking it around each call you forward to std::queue - for a single consumer that should be sufficient, for multiple consumers you'd need to fuse front() and pop_front() into a single synchronized call.

To let the consumer block while the queue is empty, you can add a condition variable to your wrapper.

That should be enough that you can find the answer online - sample code below.


template <typename T> class SynchronizedQueue
{
    std::queue<T> queue_;
    std::mutex mutex_;
    std::condition_variable condvar_;

    typedef std::lock_guard<std::mutex> lock;
    typedef std::unique_lock<std::mutex> ulock;

public:
    void push(T const &val)
    {
        lock l(mutex_); // prevents multiple pushes corrupting queue_
        bool wake = queue_.empty(); // we may need to wake consumer
        queue_.push(val);
        if (wake) condvar_.notify_one();
    }

    T pop()
    {
        ulock u(mutex_);
        while (queue_.empty())
            condvar_.wait(u);
        // now queue_ is non-empty and we still have the lock
        T retval = queue_.front();
        queue_.pop();
        return retval;
    }
};

Replace std::mutex et al with whatever primitives your "Thread.h" gives you.

Amersfoort answered 22/2, 2012 at 14:20 Comment(2)
also what do you mean by the second to last line about the consumer block?Kazachok
assuming your consumer will call pop() to fetch the next result: if the queue is empty, this should block until a producer adds something, and then return it; sample code coming.Amersfoort
R
1

What I would do is this:

  • Make a data class that hides your queue
  • Create thread-safe accessor methods for saving a piece of data to the q, and removing a piece of data from the q ( I would use a single mutex, or a critical section for accessors)
  • Handle the case where a consumor doesn't have any data to work with (sleep)
  • Handle the case where the q is becoming too full, and the producers need to slow down
  • Let the threads go willy-nilly adding and removing as they produce / consume

Also, remember to add a sleep into each and every thread, or else you'll peg the CPU and not give the thread scheduler a good spot to switch contexts and share the CPU with other threads / processes. You don't need to, but it's a good practice.

Rosemare answered 22/2, 2012 at 14:22 Comment(5)
I will vote but your suggestion is not clarifying this problem.Kazachok
Actually, changing the architecture like this inherently fixes the problem.Rosemare
... unless your problem isn't with concurrency, and instead with creating code that can run multiple threads...Rosemare
"remember to add a sleep into each and every thread" : it helps me alot. thanksGrossman
Willy-nilly for the win. That's how it's done.Tripe
B
0

When managing shared state like this, you need a condition variable and a mutex. The basic pattern is a function along the lines of:

ScopedLock l( theMutex );
while ( !conditionMet ) {
    theCondition.wait( theMutex );
}
doWhatever();
theCondition.notify();

In your case, I'd probably make the condition variable and the mutex members of the class implementing the queue. To write, the conditionMet would be !queue.full(), so you'd end up with something like:

ScopedLock l( queue.myMutex );
while ( queue.full() ) {
    queue.myCondition.wait();
}
queue.insert( whatever );
queue.myCondition.notify();

and to read:

ScopedLock l( queue.myMutex );
while ( queue.empty() ) {
    queue.myCondition.wait();
}
results = queue.extract();
queue.myCondition.notify();
return results;

Depending on the threading interface, there may be two notify functions: notify one (which wakes up a single thread), and notify all (which wakes up all of the waiting threads); in this case, you'll need notify all (or you'll need two condition variables, one for space to write, and one for something to read, with each function waiting on one, but notifying the other).

Backandforth answered 22/2, 2012 at 15:15 Comment(0)
D
0

Protect the queue accesses with a mutex, that should be it. A 'Computer Science 101' bounded producer-consumer queue needs two semaphores, (to manage the free/empty count and for producers/consumers to wait on, as you are already doing), and one mutex/futex/criticalSection to protect the queue.

I don't see how replacing the semaphores and mutex with condvars is any great help. What's the point? How do you implement a bounded producer-consumer queue with condvars that works on all platforms with multiple producers/consumers?

Demesne answered 22/2, 2012 at 18:22 Comment(0)
H
-1
#include<iostream>
#include<deque>
#include<mutex>
#include<chrono>
#include<condition_variable>
#include<thread>
using namespace std;
mutex mu,c_out;
condition_variable cv;
class Buffer
{
public:
    Buffer() {}
    void add(int ele)
    {
        unique_lock<mutex> ulock(mu);
        cv.wait(ulock,[this](){return q.size()<_size;});
        q.push_back(ele);
        mu.unlock();
        cv.notify_all();
        return;
    }
    int remove()
    {
     unique_lock<mutex> ulock(mu);
     cv.wait(ulock,[this](){return q.size()>0;});
     int v=q.back();
     q.pop_back();
     mu.unlock();
     cv.notify_all();
     return v;
    }
    int calculateAvarage()
    {
        int total=0;
        unique_lock<mutex> ulock(mu);
        cv.wait(ulock,[this](){return q.size()>0;});
        deque<int>::iterator it = q.begin();
        while (it != q.end())
        {
            total += *it;
            std::cout << ' ' << *it++;
        }
        return total/q.size();
    }
private:
    deque<int> q;
    const unsigned int _size=10;
};
class Producer
{
public:
    Producer(Buffer *_bf=NULL)
    {
        this->bf=_bf;
    }
    void Produce()
    {
        while(true)
        {
            int num=rand()%10;
            bf->add(num);
            c_out.lock();
            cout<<"Produced:"<<num<<"avarage:"<<bf->calculateAvarage()<<endl;
            this_thread::sleep_for(chrono::microseconds(5000));
            c_out.unlock();
        }
    }
private:
    Buffer *bf;
};
class Consumer
{
public:
    Consumer(Buffer *_bf=NULL)
    {
        this->bf=_bf;
    }
    void Consume()
    {
        while (true)
        {
            int num=bf->remove();
            c_out.lock();
            cout<<"Consumed:"<<num<<"avarage:"<<bf->calculateAvarage()<<endl;
            this_thread::sleep_for(chrono::milliseconds(5000));
            c_out.unlock();
        }
    }
private:
    Buffer *bf;
};
int main()
{
    Buffer b;
    Consumer c(&b);
    Producer p(&b);
    thread th1(&Producer::Produce,&p);
    thread th2(&Consumer::Consume,&c);
    th1.join();
    th2.join();
    return 0;
}

Buffer class has doublended queue and max Buffer size of 10. It has two function to add into queue and remove from queue. Buffer class has calculateAvarage() function which will calculate the avarage echa time a element is added or deleted.

There are two more classes one is producer and consumer having buffwr class pointer . We are having Consume() in consumer class and Produce() in Producer class. Consume()>>Lock the buffer and check if size is of buffer is not 0 it will remove from Buffer and notify to producer and unlock. Produce()>>Lok the buffer and check if size is of buffer is not max buffer size it will add and notify to consumer and unlock.

Hilbert answered 21/6, 2021 at 18:47 Comment(1)
A global condition_variable and mutices!? And publicly accessible (no static)? Nope. Your answer introduces additional contention. in the case of several unrelated uses of the class because the semaphores are shared across all instances!!Tripe

© 2022 - 2024 — McMap. All rights reserved.