One thread showing interest in another thread (consumer / producer)
Asked Answered
P

1

1

I would like to have to possibility to make thread (consumer) express interest in when another thread (producer) makes something. But not all the time.

Basically I want to make a one-shot consumer. Ideally the producer through would go merrily about its business until one (or many) consumers signal that they want something, in which case the producer would push some data into a variable and signal that it has done so. The consumer will wait until the variable has become filled.

It must also be so that the one-shot consumer can decide that it has waited too long and abandon the wait (a la pthread_cond_timedwait)

I've been reading many articles and SO questions about different ways to synchronize threads. Currently I'm leaning towards a condition variable approach.

I would like to know if this is a good way to go about it (being a novice at thread programming I probably have quite a few bugs in there), or if it perhaps would be better to (ab)use semaphores for this situation? Or something else entirely? Just an atomic assign to a pointer variable if available? I currently don't see how these would work safely, probably because I'm trying to stay on the safe side, this application is supposed to run for months, without locking up. Can I do without the mutexes in the producer? i.e.: just signal a condition variable?

My current code looks like this:

consumer {
   pthread_mutex_lock(m);

   pred = true; /* signal interest */

   while (pred) {
       /* wait a bit and hopefully get an answer before timing out */
       pthread_cond_timedwait(c, m, t);

       /* it is possible that the producer never produces anything, in which
          case the pred will stay true, we must "designal" interest here,
          unfortunately the also means that a spurious wake could make us miss
          a good answer, no? How to combat this? */
       pred = false;
   }

   /* if we got here that means either an answer is available or we timed out */
   //... (do things with answer if not timed out, otherwise assign default answer)

   pthread_mutex_unlock(m);
}

/* this thread is always producing, but it doesn't always have listeners */
producer {
   pthread_mutex_lock(m);

   /* if we have a listener */
   if (pred) {
      buffer = "work!";

      pred = false;

      pthread_cond_signal(c);
   }

   pthread_mutex_unlock(m);
}

NOTE: I'm on a modern linux and can make use of platform-specific functionality if necessary NOTE2: I used the seemingly global variables m, c, and t. But these would be different for every consumer.

High-level recap

I want a thread to be able to register for an event, wait for it for a specified time and then carry on. Ideally it should be possible for more than one thread to register at the same time and all threads should get the same events (all events that came in the timespan).

Pursy answered 16/4, 2013 at 10:47 Comment(0)
S
1

What you want is something similar to a std::future in c++ (doc). A consumer requests a task to be performed by a producer using a specific function. That function creates a struct called future (or promise), holding a mutex, a condition variable associated with the task as well as a void pointer for the result, and returns it to the caller. It also put that struct, the task id, and the parameters (if any) in a work queue handled by the producer.

struct future_s {
    pthread_mutex_t m;
    pthread_cond_t c;
    int flag;
    void *result;
};

// basic task outline
struct task_s {
    struct future_s result;
    int taskid;
};

// specific "mytask" task
struct mytask_s {
    struct future_s result;
    int taskid;
    int p1;
    float p2;
};

future_s *do_mytask(int p1, float p2){
     // allocate task data
     struct  mytask_s * t = alloc_task(sizeof(struct mytask_s));
     t->p1 = p1;
     t->p2 = p2;
     t->taskid = MYTASK_ID;
     task_queue_add(t);
    return (struct future_s *)t;
}

Then the producer pull the task out of the queue, process it, and once terminated, put the result in the future and trigger the variable.

The consumer may wait for the future or do something else.

For a cancellable futures, include a flag in the struct to indicate that the task is cancelled. The future is then either:

  • delivered, the consumer is the owner and must deallocate it
  • cancelled, the producer remains the owner and disposes of it.

The producer must therefore check that the future has not been cancelled before triggering the condition variable.

For a "shared" future, the flag turns into a number of subscribers. If the number is above zero, the order must be delivered. The consumer owning the result is left to be decided between all consumers (First come first served? Is the result passed along to all consumers?).

Any access to the future struct must be mutexed (which works well with the condition variable).

Regarding the queues, they may be implemented using a linked list or an array (for versions with limited capacity). Since the functions creating the futures may be called concurrently, they have to be protected with a lock, which is usually implemented with a mutex.

Submerged answered 19/4, 2013 at 19:35 Comment(5)
I really like this advice, abstracting it into futures seems like a clean way to do it and I could always swap out pthreads if I decide to go cross-platform to windows. However I'd like some clarification on one thing: how do you see the "work queue" being handled? Locked tight by a mutex for every queue (if I decide to have possibly more than one)?Pursy
Yes. I updated my answer to include that idea. It is possible to make lockless version of containers, but for the time being you should probably start with locked ones, and if needed implement a lockless solution and ensure that there is a performance gain (it's not necessarily the case).Submerged
Thanks, I'm satisfied that at least the base of my idea was ok (but needed a tad bit more abstraction). I'm concerned about performance in the long run, but much more than that even I'm concerned about stability. It absolutely, can, not, fail. That's why I've grudgingly allowed threads in my app even though I'm not very experienced with them. I know enough to see the havoc they can cause when used without concern. But I need it for my epoll_loop...Pursy
Well, the less threads share, the better for stability, maintenance, and evolutivity. That's why I never use a setup without message passing between threads, with only a few shared variables for signalling, and even then, if I can somehow put that in the messaging protocol, all the better. I think that compartimenting threads is essential for a good design.Submerged
future are a good abstraction, but they still force sharing. I offered this solution because it's simple and straightforward in your case, but for a classic producer consumer setup, I would just let the producer finish its computation, and have the consumer throw it away if it came to late. That said, for a costly computation, it might be worth including that cancelling mechanism.Submerged

© 2022 - 2024 — McMap. All rights reserved.