Thread pool using boost asio
Asked Answered
Q

1

23

I am trying to create a limited thread pool class using boost::asio. But I am stuck at one point can some one help me.

The only problem is the place where I should decrease counter?

code does not work as expected.

the problem is I don't know when my thread will finish execution and how I will come to know that it has return to pool

#include <boost/asio.hpp>
#include <iostream>
#include <boost/thread/thread.hpp>
#include <boost/bind.hpp>
#include <boost/thread/mutex.hpp>
#include <stack>

using namespace std;
using namespace boost;

class ThreadPool
{
    static int count;
    int NoOfThread;
    thread_group grp;
    mutex mutex_;
    asio::io_service io_service;
    int counter;
    stack<thread*> thStk ;

public:
    ThreadPool(int num)
    {   
        NoOfThread = num;
        counter = 0;
        mutex::scoped_lock lock(mutex_);

        if(count == 0)
            count++;
        else
            return;

        for(int i=0 ; i<num ; ++i)
        {
            thStk.push(grp.create_thread(boost::bind(&asio::io_service::run, &io_service)));
        }
    }
    ~ThreadPool()
    {
        io_service.stop();
        grp.join_all();
    }

    thread* getThread()
    {
        if(counter > NoOfThread)
        {
            cout<<"run out of threads \n";
            return NULL;
        }

        counter++;
        thread* ptr = thStk.top();
        thStk.pop();
        return ptr;
    }
};
int ThreadPool::count = 0;


struct callable
{
    void operator()()
    {
        cout<<"some task for thread \n";
    }
};

int main( int argc, char * argv[] )
{

    callable x;
    ThreadPool pool(10);
    thread* p = pool.getThread();
    cout<<p->get_id();

    //how i can assign some function to thread pointer ?
    //how i can return thread pointer after work done so i can add 
//it back to stack?


    return 0;
}
Quadricycle answered 31/8, 2012 at 12:27 Comment(3)
@jupiter thanks for edting , but this is not final code cplusplus.com/forum/general/77981 please go through link till bottom u will see modified code @ end.Quadricycle
please go through final code posted here cplusplus.com/forum/general/77981Quadricycle
Decreasing counter is not the only problem (or I took a look at wrong code, please edit your question). To name a few: your thread pool will not execute anything (you call io_service::run() without anything posted, so it will finish immediately and future post()-ed task cannot be executed); only first ThreadPool object ever created will create some threads; variables are not protected by mutexes (or in some other way).Pimple
X
39

In short, you need to wrap the user's provided task with another function that will:

  • Invoke the user function or callable object.
  • Lock the mutex and decrement the counter.

I may not be understanding all the requirements for this thread pool. Thus, for clarity, here is an explicit list as to what I believe are the requirements:

  • The pool manages the lifetime of the threads. The user should not be able to delete threads that reside within the pool.
  • The user can assign a task to the pool in a non-intrusive way.
  • When a task is being assigned, if all threads in the pool are currently running other tasks, then the task is discarded.

Before I provide an implementation, there are a few key points I would like to stress:

  • Once a thread has been launched, it will run until completion, cancellation, or termination. The function the thread is executing cannot be reassigned. To allow for a single thread to execute multiple functions over the course of its life, the thread will want to launch with a function that will read from a queue, such as io_service::run(), and callable types are posted into the event queue, such as from io_service::post().
  • io_service::run() returns if there is no work pending in the io_service, the io_service is stopped, or an exception is thrown from a handler that the thread was running. To prevent io_serivce::run() from returning when there is no unfinished work, the io_service::work class can be used.
  • Defining the task's type requirements (i.e. the task's type must be callable by object() syntax) instead of requiring a type (i.e. task must inherit from process), provides more flexibility to the user. It allows the user to supply a task as a function pointer or a type providing a nullary operator().

Implementation using boost::asio:

#include <boost/asio.hpp>
#include <boost/thread.hpp>

class thread_pool
{
private:
  boost::asio::io_service io_service_;
  boost::asio::io_service::work work_;
  boost::thread_group threads_;
  std::size_t available_;
  boost::mutex mutex_;
public:

  /// @brief Constructor.
  thread_pool( std::size_t pool_size )
    : work_( io_service_ ),
      available_( pool_size )
  {
    for ( std::size_t i = 0; i < pool_size; ++i )
    {
      threads_.create_thread( boost::bind( &boost::asio::io_service::run,
                                           &io_service_ ) );
    }
  }

  /// @brief Destructor.
  ~thread_pool()
  {
    // Force all threads to return from io_service::run().
    io_service_.stop();

    // Suppress all exceptions.
    try
    {
      threads_.join_all();
    }
    catch ( const std::exception& ) {}
  }

  /// @brief Adds a task to the thread pool if a thread is currently available.
  template < typename Task >
  void run_task( Task task )
  {
    boost::unique_lock< boost::mutex > lock( mutex_ );

    // If no threads are available, then return.
    if ( 0 == available_ ) return;

    // Decrement count, indicating thread is no longer available.
    --available_;

    // Post a wrapped task into the queue.
    io_service_.post( boost::bind( &thread_pool::wrap_task, this,
                                   boost::function< void() >( task ) ) );
  }

private:
  /// @brief Wrap a task so that the available count can be increased once
  ///        the user provided task has completed.
  void wrap_task( boost::function< void() > task )
  {
    // Run the user supplied task.
    try
    {
      task();
    }
    // Suppress all exceptions.
    catch ( const std::exception& ) {}

    // Task has finished, so increment count of available threads.
    boost::unique_lock< boost::mutex > lock( mutex_ );
    ++available_;
  }
};

A few comments about the implementation:

  • Exception handling needs to occur around the user's task. If the user's function or callable object throws an exception that is not of type boost::thread_interrupted, then std::terminate() is called. This is the the result of Boost.Thread's exceptions in thread functions behavior. It is also worth reading Boost.Asio's effect of exceptions thrown from handlers.
  • If the user provides the task via boost::bind, then the nested boost::bind will fail to compile. One of the following options is required:
    • Not support task created by boost::bind.
    • Meta-programming to perform compile-time branching based on whether or not the user's type if the result of boost::bind so that boost::protect could be used, as boost::protect only functions properly on certain function objects.
    • Use another type to pass the task object indirectly. I opted to use boost::function for readability at the cost of losing the exact type. boost::tuple, while slightly less readable, could also be used to preserve the exact type, as seen in the Boost.Asio's serialization example.

Application code can now use the thread_pool type non-intrusively:

void work() {};

struct worker
{
  void operator()() {};
};

void more_work( int ) {};

int main()
{ 
  thread_pool pool( 2 );
  pool.run_task( work );                        // Function pointer.
  pool.run_task( worker() );                    // Callable object.
  pool.run_task( boost::bind( more_work, 5 ) ); // Callable object.
}

The thread_pool could be created without Boost.Asio, and may be slightly easier for maintainers, as they no longer need to know about Boost.Asio behaviors, such as when does io_service::run() return, and what is the io_service::work object:

#include <queue>
#include <boost/bind.hpp>
#include <boost/thread.hpp>

class thread_pool
{
private:
  std::queue< boost::function< void() > > tasks_;
  boost::thread_group threads_;
  std::size_t available_;
  boost::mutex mutex_;
  boost::condition_variable condition_;
  bool running_;
public:

  /// @brief Constructor.
  thread_pool( std::size_t pool_size )
    : available_( pool_size ),
      running_( true )
  {
    for ( std::size_t i = 0; i < pool_size; ++i )
    {
      threads_.create_thread( boost::bind( &thread_pool::pool_main, this ) ) ;
    }
  }

  /// @brief Destructor.
  ~thread_pool()
  {
    // Set running flag to false then notify all threads.
    {
      boost::unique_lock< boost::mutex > lock( mutex_ );
      running_ = false;
      condition_.notify_all();
    }

    try
    {
      threads_.join_all();
    }
    // Suppress all exceptions.
    catch ( const std::exception& ) {}
  }

  /// @brief Add task to the thread pool if a thread is currently available.
  template < typename Task >
  void run_task( Task task )
  {
    boost::unique_lock< boost::mutex > lock( mutex_ );

    // If no threads are available, then return.
    if ( 0 == available_ ) return;

    // Decrement count, indicating thread is no longer available.
    --available_;

    // Set task and signal condition variable so that a worker thread will
    // wake up andl use the task.
    tasks_.push( boost::function< void() >( task ) );
    condition_.notify_one();
  }

private:
  /// @brief Entry point for pool threads.
  void pool_main()
  {
    while( running_ )
    {
      // Wait on condition variable while the task is empty and the pool is
      // still running.
      boost::unique_lock< boost::mutex > lock( mutex_ );
      while ( tasks_.empty() && running_ )
      {
        condition_.wait( lock );
      }
      // If pool is no longer running, break out.
      if ( !running_ ) break;

      // Copy task locally and remove from the queue.  This is done within
      // its own scope so that the task object is destructed immediately
      // after running the task.  This is useful in the event that the
      // function contains shared_ptr arguments bound via bind.
      {
        boost::function< void() > task = tasks_.front();
        tasks_.pop();

        lock.unlock();

        // Run the task.
        try
        {
          task();
        }
        // Suppress all exceptions.
        catch ( const std::exception& ) {}
      }

      // Task has finished, so increment count of available threads.
      lock.lock();
      ++available_;
    } // while running_
  }
};
Xylophagous answered 4/9, 2012 at 15:52 Comment(7)
How can we use the thread pool if we are also using other asio objects like boost::asio::ip::udp::socket which internally posts tasks for the asynchronous operations?Ventricle
This is not a pool, since it stops add tasks whenever pool is full. The newly tasks should be in kept in a container waiting for execution.Riane
@Riane It is a pool based on the characteristics of the original poster's question. As the characteristics are neither common nor clear, they are explicitly listed. Pools can use various different strategies to manage the size of the task queue.Xylophagous
@TannerSansbury Glad to know other strategies to manage the size, would you kindly give some information?Riane
@Riane I have various strategies, such as enqueueing until memory allocation fails, limit queue to a fixed max size, fail to enqueue based on heuristics (such as the oldest tasks has been sitting in queue for a given period of time), enqueue but spawn off additional threads (permanent or temporary).Xylophagous
@TannerSansbury Thanks. I grasp what you mean. Let's think a scenario where tons of connections asks for limited resource. Put all the connections in a queue of no limit on capacity may break down the servers. So, a queue of limited capacity would be necessary. Am I right?Riane
@Riane Yes, that could be one scenario. I often see fixed sized queues used on embedded devices to manage memory usage and prevent free space fragmentation.Xylophagous

© 2022 - 2024 — McMap. All rights reserved.