In short, you need to wrap the user's provided task with another function that will:
- Invoke the user function or callable object.
- Lock the mutex and decrement the counter.
I may not be understanding all the requirements for this thread pool. Thus, for clarity, here is an explicit list as to what I believe are the requirements:
- The pool manages the lifetime of the threads. The user should not be able to delete threads that reside within the pool.
- The user can assign a task to the pool in a non-intrusive way.
- When a task is being assigned, if all threads in the pool are currently running other tasks, then the task is discarded.
Before I provide an implementation, there are a few key points I would like to stress:
- Once a thread has been launched, it will run until completion, cancellation, or termination. The function the thread is executing cannot be reassigned. To allow for a single thread to execute multiple functions over the course of its life, the thread will want to launch with a function that will read from a queue, such as
io_service::run()
, and callable types are posted into the event queue, such as from io_service::post()
.
io_service::run()
returns if there is no work pending in the io_service
, the io_service
is stopped, or an exception is thrown from a handler that the thread was running. To prevent io_serivce::run()
from returning when there is no unfinished work, the io_service::work
class can be used.
- Defining the task's type requirements (i.e. the task's type must be callable by
object()
syntax) instead of requiring a type (i.e. task must inherit from process
), provides more flexibility to the user. It allows the user to supply a task as a function pointer or a type providing a nullary operator()
.
Implementation using boost::asio
:
#include <boost/asio.hpp>
#include <boost/thread.hpp>
class thread_pool
{
private:
boost::asio::io_service io_service_;
boost::asio::io_service::work work_;
boost::thread_group threads_;
std::size_t available_;
boost::mutex mutex_;
public:
/// @brief Constructor.
thread_pool( std::size_t pool_size )
: work_( io_service_ ),
available_( pool_size )
{
for ( std::size_t i = 0; i < pool_size; ++i )
{
threads_.create_thread( boost::bind( &boost::asio::io_service::run,
&io_service_ ) );
}
}
/// @brief Destructor.
~thread_pool()
{
// Force all threads to return from io_service::run().
io_service_.stop();
// Suppress all exceptions.
try
{
threads_.join_all();
}
catch ( const std::exception& ) {}
}
/// @brief Adds a task to the thread pool if a thread is currently available.
template < typename Task >
void run_task( Task task )
{
boost::unique_lock< boost::mutex > lock( mutex_ );
// If no threads are available, then return.
if ( 0 == available_ ) return;
// Decrement count, indicating thread is no longer available.
--available_;
// Post a wrapped task into the queue.
io_service_.post( boost::bind( &thread_pool::wrap_task, this,
boost::function< void() >( task ) ) );
}
private:
/// @brief Wrap a task so that the available count can be increased once
/// the user provided task has completed.
void wrap_task( boost::function< void() > task )
{
// Run the user supplied task.
try
{
task();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
// Task has finished, so increment count of available threads.
boost::unique_lock< boost::mutex > lock( mutex_ );
++available_;
}
};
A few comments about the implementation:
- Exception handling needs to occur around the user's task. If the user's function or callable object throws an exception that is not of type
boost::thread_interrupted
, then std::terminate()
is called. This is the the result of Boost.Thread's exceptions in thread functions behavior. It is also worth reading Boost.Asio's effect of exceptions thrown from handlers.
- If the user provides the
task
via boost::bind
, then the nested boost::bind
will fail to compile. One of the following options is required:
- Not support
task
created by boost::bind
.
- Meta-programming to perform compile-time branching based on whether or not the user's type if the result of
boost::bind
so that boost::protect
could be used, as boost::protect
only functions properly on certain function objects.
- Use another type to pass the
task
object indirectly. I opted to use boost::function
for readability at the cost of losing the exact type. boost::tuple
, while slightly less readable, could also be used to preserve the exact type, as seen in the Boost.Asio's serialization example.
Application code can now use the thread_pool
type non-intrusively:
void work() {};
struct worker
{
void operator()() {};
};
void more_work( int ) {};
int main()
{
thread_pool pool( 2 );
pool.run_task( work ); // Function pointer.
pool.run_task( worker() ); // Callable object.
pool.run_task( boost::bind( more_work, 5 ) ); // Callable object.
}
The thread_pool
could be created without Boost.Asio, and may be slightly easier for maintainers, as they no longer need to know about Boost.Asio
behaviors, such as when does io_service::run()
return, and what is the io_service::work
object:
#include <queue>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
class thread_pool
{
private:
std::queue< boost::function< void() > > tasks_;
boost::thread_group threads_;
std::size_t available_;
boost::mutex mutex_;
boost::condition_variable condition_;
bool running_;
public:
/// @brief Constructor.
thread_pool( std::size_t pool_size )
: available_( pool_size ),
running_( true )
{
for ( std::size_t i = 0; i < pool_size; ++i )
{
threads_.create_thread( boost::bind( &thread_pool::pool_main, this ) ) ;
}
}
/// @brief Destructor.
~thread_pool()
{
// Set running flag to false then notify all threads.
{
boost::unique_lock< boost::mutex > lock( mutex_ );
running_ = false;
condition_.notify_all();
}
try
{
threads_.join_all();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
}
/// @brief Add task to the thread pool if a thread is currently available.
template < typename Task >
void run_task( Task task )
{
boost::unique_lock< boost::mutex > lock( mutex_ );
// If no threads are available, then return.
if ( 0 == available_ ) return;
// Decrement count, indicating thread is no longer available.
--available_;
// Set task and signal condition variable so that a worker thread will
// wake up andl use the task.
tasks_.push( boost::function< void() >( task ) );
condition_.notify_one();
}
private:
/// @brief Entry point for pool threads.
void pool_main()
{
while( running_ )
{
// Wait on condition variable while the task is empty and the pool is
// still running.
boost::unique_lock< boost::mutex > lock( mutex_ );
while ( tasks_.empty() && running_ )
{
condition_.wait( lock );
}
// If pool is no longer running, break out.
if ( !running_ ) break;
// Copy task locally and remove from the queue. This is done within
// its own scope so that the task object is destructed immediately
// after running the task. This is useful in the event that the
// function contains shared_ptr arguments bound via bind.
{
boost::function< void() > task = tasks_.front();
tasks_.pop();
lock.unlock();
// Run the task.
try
{
task();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
}
// Task has finished, so increment count of available threads.
lock.lock();
++available_;
} // while running_
}
};
io_service::run()
without anything posted, so it will finish immediately and futurepost()
-ed task cannot be executed); only first ThreadPool object ever created will create some threads; variables are not protected by mutexes (or in some other way). – Pimple