asio::io_service and thread_group lifecycle issue
Asked Answered
D

1

2

Looking at answers like this one, we can do stuff like:

boost::asio::io_service ioService;
boost::thread_group threadpool;
{
    boost::asio::io_service::work work(ioService);
    threadpool.create_thread(boost::bind(&boost::asio::io_service::run, ioService));
    threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService));
    ioService.post(boost::bind(...));
    ioService.post(boost::bind(...));
    ioService.post(boost::bind(...));
}
threadpool.join_all();

However, in my case I want to do something like:

while (condition)
{
    ioService.post(boost::bind(...));
    ioService.post(boost::bind(...));
    ioService.post(boost::bind(...));
    threadpool.join_all();

    // DO SOMETHING WITH RESULTS
}

However, the boost::asio::io_service::work work(ioService) line is out of place, and as far as I can see, I cannot recreate it without needing to create every thread in the pool again.

In my code, the thread creation overhead seems negligible (and actually better performance than previous mutex-based code), but is there a cleaner way to do this?

Denti answered 16/12, 2015 at 5:4 Comment(0)
F
2
while (condition)
{
    //... stuff
    threadpool.join_all();

    //... 
}

Doesn't make any sense, because you can only join threads once. Once joined, they are gone. You don't want to be starting new threads all the time (use a thread pool + task queue¹).

Since you don't want to actually stop the threads, you probably don't want to destruct the work. If you insist, a shared_ptr<work> or optional<work> works nicely (just my_work.reset() it)

¹ Update Suggestion:

UPDATE

A simple extension to "SOLUTION #2" would make it possible to wait for all tasks to have been completed, without joining the workers/destroying the pool:

  void drain() {
      unique_lock<mutex> lk(mx);
      namespace phx = boost::phoenix;
      cv.wait(lk, phx::empty(phx::ref(_queue)));
  }

Note that for reliable operation, one needs to signal the condition variable on de-queue as well:

      cv.notify_all(); // in order to signal drain

CAVEATS

  1. It's an interface inviting race conditions (the queue could accept jobs from many threads, so once drain() returns, another thread could have posted a new task already)

  2. This signals when the queue is empty, not when the task is completed. The queue cannot know about this, if you need this, use a barrier/signal a condition from within the task (the_work in this example). The mechanism for queuing/scheduling is not relevant there.

DEMO

Live On Coliru

#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
#include <boost/optional.hpp>

using namespace boost;
using namespace boost::phoenix::arg_names;

class thread_pool
{
  private:
      mutex mx;
      condition_variable cv;

      typedef function<void()> job_t;
      std::deque<job_t> _queue;

      thread_group pool;

      boost::atomic_bool shutdown;
      static void worker_thread(thread_pool& q)
      {
          while (auto job = q.dequeue())
              (*job)();
      }

  public:
      thread_pool() : shutdown(false) {
          for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
              pool.create_thread(bind(worker_thread, ref(*this)));
      }

      void enqueue(job_t job) 
      {
          lock_guard<mutex> lk(mx);
          _queue.push_back(std::move(job));

          cv.notify_one();
      }

      void drain() {
          unique_lock<mutex> lk(mx);
          namespace phx = boost::phoenix;
          cv.wait(lk, phx::empty(phx::ref(_queue)));
      }

      optional<job_t> dequeue() 
      {
          unique_lock<mutex> lk(mx);
          namespace phx = boost::phoenix;

          cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));

          if (_queue.empty())
              return none;

          auto job = std::move(_queue.front());
          _queue.pop_front();

          cv.notify_all(); // in order to signal drain

          return std::move(job);
      }

      ~thread_pool()
      {
          shutdown = true;
          {
              lock_guard<mutex> lk(mx);
              cv.notify_all();
          }

          pool.join_all();
      }
};

void the_work(int id)
{
    std::cout << "worker " << id << " entered\n";

    // no more synchronization; the pool size determines max concurrency
    std::cout << "worker " << id << " start work\n";
    this_thread::sleep_for(chrono::milliseconds(2));
    std::cout << "worker " << id << " done\n";
}

int main()
{
    thread_pool pool; // uses 1 thread per core

    for (auto i = 0ull; i < 20; ++i) {
        for (int i = 0; i < 10; ++i)
            pool.enqueue(bind(the_work, i));

        pool.drain(); // make the queue empty, leave the threads
        std::cout << "Queue empty\n";
    }

    // destructing pool joins the worker threads
}
Faulkner answered 16/12, 2015 at 7:54 Comment(6)
Added links to complete implementations of a proper thread_pool, including one based on Boost Asio's io_serviceFaulkner
Thanks for the very useful links, but the only thing missing is how to do "wait until ioService queue is empty" to replace the threadpool.join_all().Denti
The destructor does it - just look for join really.Faulkner
I don't think that's what I wanted to ask; as you say, you can only join once, but I want to keep using the thread pool, just wait for the job queue to empty. I could extend your Solution #2 to support that.Denti
@KenY-N see update. Read the [CAVEATS] too (also try to word your question better next time. Your question clearly shows the join calls inside the loop. I'm happy to hear that's not what you actually want, but why is it in your question ...)Faulkner
Indeed, I wasn't aware until you pointed it out that join was a once-only option. Thanks for the update to the code too!Denti

© 2022 - 2024 — McMap. All rights reserved.