Thread pooling in C++11
Asked Answered
R

12

200

Relevant questions:

About C++11:

About Boost:


How do I get a pool of threads to send tasks to, without creating and deleting them over and over again? This means persistent threads to resynchronize without joining.


I have code that looks like this:

namespace {
  std::vector<std::thread> workers;

  int total = 4;
  int arr[4] = {0};

  void each_thread_does(int i) {
    arr[i] += 2;
  }
}

int main(int argc, char *argv[]) {
  for (int i = 0; i < 8; ++i) { // for 8 iterations,
    for (int j = 0; j < 4; ++j) {
      workers.push_back(std::thread(each_thread_does, j));
    }
    for (std::thread &t: workers) {
      if (t.joinable()) {
        t.join();
      }
    }
    arr[4] = std::min_element(arr, arr+4);
  }
  return 0;
}

Instead of creating and joining threads each iteration, I'd prefer to send tasks to my worker threads each iteration and only create them once.

Rusk answered 1/4, 2013 at 21:59 Comment(4)
here's a related question and my answer.Roubaix
thought about using tbb (it's Intel, but free & open source, and does exactly what you want: you simply submit (recursively divisible) tasks and don't worry about the threads)?Antofagasta
This FOSS project is my attempt to create a thread pool library, check it out if you want. -> code.google.com/p/threadpool11Viaduct
What's wrong with using tbb?Antofagasta
K
205

This is adapted from my answer to another very similar post.

Let's build a ThreadPool class:

class ThreadPool {
public:
    void Start();
    void QueueJob(const std::function<void()>& job);
    void Stop();
    bool busy();

private:
    void ThreadLoop();

    bool should_terminate = false;           // Tells threads to stop looking for jobs
    std::mutex queue_mutex;                  // Prevents data races to the job queue
    std::condition_variable mutex_condition; // Allows threads to wait on new jobs or termination 
    std::vector<std::thread> threads;
    std::queue<std::function<void()>> jobs;
};
  1. ThreadPool::Start

For an efficient threadpool implementation, once threads are created according to num_threads, it's better not to create new ones or destroy old ones (by joining). There will be a performance penalty, and it might even make your application go slower than the serial version. Thus, we keep a pool of threads that can be used at any time (if they aren't already running a job).

Each thread should be running its own infinite loop, constantly waiting for new tasks to grab and run.

void ThreadPool::Start() {
    const uint32_t num_threads = std::thread::hardware_concurrency(); // Max # of threads the system supports
    for (uint32_t ii = 0; ii < num_threads; ++ii) {
        threads.emplace_back(std::thread(&ThreadPool::ThreadLoop,this))
    }
}
  1. ThreadPool::ThreadLoop

The infinite loop function. This is a while (true) loop waiting for the task queue to open up.

void ThreadPool::ThreadLoop() {
    while (true) {
        std::function<void()> job;
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            mutex_condition.wait(lock, [this] {
                return !jobs.empty() || should_terminate;
            });
            if (should_terminate) {
                return;
            }
            job = jobs.front();
            jobs.pop();
        }
        job();
    }
}
  1. ThreadPool::QueueJob

Add a new job to the pool; use a lock so that there isn't a data race.

void ThreadPool::QueueJob(const std::function<void()>& job) {
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        jobs.push(job);
    }
    mutex_condition.notify_one();
}

To use it:

thread_pool->QueueJob([] { /* ... */ });
  1. ThreadPool::busy
bool ThreadPool::busy() {
    bool poolbusy;
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        poolbusy = !jobs.empty();
    }
    return poolbusy;
}

The busy() function can be used in a while loop, such that the main thread can wait the threadpool to complete all the tasks before calling the threadpool destructor.

  1. ThreadPool::Stop

Stop the pool.

void ThreadPool::Stop() {
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        should_terminate = true;
    }
    mutex_condition.notify_all();
    for (std::thread& active_thread : threads) {
        active_thread.join();
    }
    threads.clear();
}

Once you integrate these ingredients, you have your own dynamic threading pool. These threads always run, waiting for job to do.

I apologize if there are some syntax errors, I typed this code and and I have a bad memory. Sorry that I cannot provide you the complete thread pool code; that would violate my job integrity.

Notes:

  • The anonymous code blocks are used so that when they are exited, the std::unique_lock variables created within them go out of scope, unlocking the mutex.
  • ThreadPool::Stop will not terminate any currently running jobs, it just waits for them to finish via active_thread.join().
Keil answered 15/9, 2015 at 19:12 Comment(18)
How do you have a vector<thread> when thread(const thread&) = delete?Beyrouth
@ChristopherPisz std::vector does not require its elements to be copyable. You can use vectors with move-only types (unique_ptr, thread, future, etc.).Nazareth
in your above example, how do you stop the pool ? Should the condition.wait also look for a variable stop_ and check if (stop_ == true) { break;} ?Electrotechnics
@John, please see the shutdown method above.Keil
In shutdown(), it should be thread_vector.clear(); instead of thread_vector.empty(); Correct?Lullaby
Can the 'Some_Method' in line Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object)); be a private method accessing other private members of SomeClass?Xantho
What happens when you terminate and there are no jobs left?Tade
"Infinite_loop_function" is a funny name for a function that consumes tasks from a queue and performs them.Strontium
Don't Queue and terminate_pool need to be captures for condition variable lambda?Approachable
@Tade I think it causes segment fault or some undefined behavior. Either the thread needs to check terminate_pool after waking up or some kind of poison pill needs to be put on job queue to notify the threads to end the infinite loop.Claypoole
Minor nit pick: You said, "...destroy old ones (by joining)." A newbie might read that, and come away with the idea that t.join() somehow "destroys" thread t. I've seen more than a few questions in this forum in which the heart of the misunderstanding was that the OP thought t.join() would do something to thread t.Strontium
added the "busy" functionKeil
I guess that in method Start(), threads.at(i) = std::thread(ThreadLoop); should be threads.at(i) = std::thread(&ThreadPool::ThreadLoop, this);Gally
I think there's an error: poolbusy = jobs.empty() should be poolbusy = !jobs.empty()Catenane
Shouldn't busy() have the return type bool ?Oftentimes
@ph_0, you are right, typos correctedKeil
@PeterK, you are right, typos correctedKeil
@IgnacioMartin, you are right, updated to correct syntax, also used emplace_back, which carries more clear meaning than "at"Keil
F
114

You can use C++ Thread Pool Library, https://github.com/vit-vit/ctpl.

Then the code your wrote can be replaced with the following

#include <ctpl.h>  // or <ctpl_stl.h> if ou do not have Boost library

int main (int argc, char *argv[]) {
    ctpl::thread_pool p(2 /* two threads in the pool */);
    int arr[4] = {0};
    std::vector<std::future<void>> results(4);
    for (int i = 0; i < 8; ++i) { // for 8 iterations,
        for (int j = 0; j < 4; ++j) {
            results[j] = p.push([&arr, j](int){ arr[j] +=2; });
        }
        for (int j = 0; j < 4; ++j) {
            results[j].get();
        }
        arr[4] = std::min_element(arr, arr + 4);
    }
}

You will get the desired number of threads and will not create and delete them over and over again on the iterations.

Frogmouth answered 5/9, 2014 at 11:20 Comment(8)
This should be the answer; single-header, readable, straightforward, concise and standard-compliant C++11 library. Great work!Scanties
@Frogmouth can you give an example with a function please? how do you push a class member function at results[j] = p.push([&arr, j](int){ arr[j] +=2; });Chondroma
@HaniGoc Just capture the instance by reference.Scanties
@Frogmouth Sent you a pull request to improve the STL version.Scanties
@vit-vit: It's difficult to contact the maintainer of that library with questions, hint hint.Reorganization
@HaniGoc I was trying for ages to make this work using bind... hooray for lambdas! >.<Muscadel
How do you implement join() on the created threads?Toplofty
@Toplofty There is no need to join(), cause those thread will never "completed" except you destruct p or resize p.thread number. But the jobs you have pushed will be completed. results[j].get() is a blocking function and it is a "join()" to those jobs. you can also call results[j].wait() to make get() not blocking.Greenland
E
74

A pool of threads means that all your threads are running, all the time – in other words, the thread function never returns. To give the threads something meaningful to do, you have to design a system of inter-thread communication, both for the purpose of telling the thread that there's something to do, as well as for communicating the actual work data.

Typically this will involve some kind of concurrent data structure, and each thread would presumably sleep on some kind of condition variable, which would be notified when there's work to do. Upon receiving the notification, one or several of the threads wake up, recover a task from the concurrent data structure, process it, and store the result in an analogous fashion.

The thread would then go on to check whether there's even more work to do, and if not go back to sleep.

The upshot is that you have to design all this yourself, since there isn't a natural notion of "work" that's universally applicable. It's quite a bit of work, and there are some subtle issues you have to get right. (You can program in Go if you like a system which takes care of thread management for you behind the scenes.)

Euripides answered 1/4, 2013 at 22:27 Comment(2)
"you have to design all this yourself" <- that is what I'm trying to avoid doing. Goroutines seem fantastic, though.Rusk
@Yktula: Well, it's a highly non-trivial task. It's not even clear from your post what kind of work you want done, and that's profoundly fundamental to the solution. You can implement Go in C++, but it'll be a very specific thing, and half the people would complain that they'd want something different.Euripides
R
21

A threadpool is at core a set of threads all bound to a function working as an event loop. These threads will endlessly wait for a task to be executed, or their own termination.

The threadpool job is to provide an interface to submit jobs, define (and perhaps modify) the policy of running these jobs (scheduling rules, thread instantiation, size of the pool), and monitor the status of the threads and related resources.

So for a versatile pool, one must start by defining what a task is, how it is launched, interrupted, what is the result (see the notion of promise and future for that question), what sort of events the threads will have to respond to, how they will handle them, how these events shall be discriminated from the ones handled by the tasks. This can become quite complicated as you can see, and impose restrictions on how the threads will work, as the solution becomes more and more involved.

The current tooling for handling events is fairly barebones(*): primitives like mutexes, condition variables, and a few abstractions on top of that (locks, barriers). But in some cases, these abstrations may turn out to be unfit (see this related question), and one must revert to using the primitives.

Other problems have to be managed too:

  • signal
  • i/o
  • hardware (processor affinity, heterogenous setup)

How would these play out in your setting?

This answer to a similar question points to an existing implementation meant for boost and the stl.

I offered a very crude implementation of a threadpool for another question, which doesn't address many problems outlined above. You might want to build up on it. You might also want to have a look of existing frameworks in other languages, to find inspiration.


(*) I don't see that as a problem, quite to the contrary. I think it's the very spirit of C++ inherited from C.

Roubaix answered 1/4, 2013 at 23:33 Comment(1)
the "This answer" link links to the question, I didn't find the answer you refer to.Glaucoma
E
13
Follwoing [PhD EcE](https://stackoverflow.com/users/3818417/phd-ece) suggestion, I implemented the thread pool:

function_pool.h

#pragma once
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <cassert>

class Function_pool
{

private:
    std::queue<std::function<void()>> m_function_queue;
    std::mutex m_lock;
    std::condition_variable m_data_condition;
    std::atomic<bool> m_accept_functions;

public:

    Function_pool();
    ~Function_pool();
    void push(std::function<void()> func);
    void done();
    void infinite_loop_func();
};

function_pool.cpp

#include "function_pool.h"

Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true)
{
}

Function_pool::~Function_pool()
{
}

void Function_pool::push(std::function<void()> func)
{
    std::unique_lock<std::mutex> lock(m_lock);
    m_function_queue.push(func);
    // when we send the notification immediately, the consumer will try to get the lock , so unlock asap
    lock.unlock();
    m_data_condition.notify_one();
}

void Function_pool::done()
{
    std::unique_lock<std::mutex> lock(m_lock);
    m_accept_functions = false;
    lock.unlock();
    // when we send the notification immediately, the consumer will try to get the lock , so unlock asap
    m_data_condition.notify_all();
    //notify all waiting threads.
}

void Function_pool::infinite_loop_func()
{
    std::function<void()> func;
    while (true)
    {
        {
            std::unique_lock<std::mutex> lock(m_lock);
            m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; });
            if (!m_accept_functions && m_function_queue.empty())
            {
                //lock will be release automatically.
                //finish the thread loop and let it join in the main thread.
                return;
            }
            func = m_function_queue.front();
            m_function_queue.pop();
            //release the lock
        }
        func();
    }
}

main.cpp

#include "function_pool.h"
#include <string>
#include <iostream>
#include <mutex>
#include <functional>
#include <thread>
#include <vector>

Function_pool func_pool;

class quit_worker_exception : public std::exception {};

void example_function()
{
    std::cout << "bla" << std::endl;
}

int main()
{
    std::cout << "stating operation" << std::endl;
    int num_threads = std::thread::hardware_concurrency();
    std::cout << "number of threads = " << num_threads << std::endl;
    std::vector<std::thread> thread_pool;
    for (int i = 0; i < num_threads; i++)
    {
        thread_pool.push_back(std::thread(&Function_pool::infinite_loop_func, &func_pool));
    }

    //here we should send our functions
    for (int i = 0; i < 50; i++)
    {
        func_pool.push(example_function);
    }
    func_pool.done();
    for (unsigned int i = 0; i < thread_pool.size(); i++)
    {
        thread_pool.at(i).join();
    }
}
Epsomite answered 18/7, 2018 at 10:46 Comment(3)
Thanks! This really helped me get started with parallel threading operations. I ended up using a slightly modified version of your implementation.Quitrent
You dont need m_accept_functions to be atomic type. m_accept_functions protected by mutex.Judenberg
It's good that we can call join()Toplofty
C
12

You can use thread_pool from boost library:

void my_task(){...}

int main(){
    int threadNumbers = thread::hardware_concurrency();
    boost::asio::thread_pool pool(threadNumbers);

    // Submit a function to the pool.
    boost::asio::post(pool, my_task);

    // Submit a lambda object to the pool.
    boost::asio::post(pool, []() {
      ...
    });
}

You also can use threadpool from open source community:

void first_task() {...}    
void second_task() {...}

int main(){
    int threadNumbers = thread::hardware_concurrency();
    pool tp(threadNumbers);

    // Add some tasks to the pool.
    tp.schedule(&first_task);
    tp.schedule(&second_task);
}
Checkerberry answered 20/6, 2019 at 8:41 Comment(0)
G
4

Edit: This now requires C++17 and concepts. (As of 9/12/16, only g++ 6.0+ is sufficient.)

The template deduction is a lot more accurate because of it, though, so it's worth the effort of getting a newer compiler. I've not yet found a function that requires explicit template arguments.

It also now takes any appropriate callable object (and is still statically typesafe!!!).

It also now includes an optional green threading priority thread pool using the same API. This class is POSIX only, though. It uses the ucontext_t API for userspace task switching.


I created a simple library for this. An example of usage is given below. (I'm answering this because it was one of the things I found before I decided it was necessary to write it myself.)

bool is_prime(int n){
  // Determine if n is prime.
}

int main(){
  thread_pool pool(8); // 8 threads

  list<future<bool>> results;
  for(int n = 2;n < 10000;n++){
    // Submit a job to the pool.
    results.emplace_back(pool.async(is_prime, n));
  }

  int n = 2;
  for(auto i = results.begin();i != results.end();i++, n++){
    // i is an iterator pointing to a future representing the result of is_prime(n)
    cout << n << " ";
    bool prime = i->get(); // Wait for the task is_prime(n) to finish and get the result.
    if(prime)
      cout << "is prime";
    else
      cout << "is not prime";
    cout << endl;
  }  
}

You can pass async any function with any (or void) return value and any (or no) arguments and it will return a corresponding std::future. To get the result (or just wait until a task has completed) you call get() on the future.

Here's the github: https://github.com/Tyler-Hardin/thread_pool.

Godspeed answered 15/6, 2014 at 18:57 Comment(5)
Looks amazing, but would be great to have a comparison to vit-vit's header!Scanties
@Sh3ljohn, from glancing at it, it appears they're basically the same in API. vit-vit uses boost's lockfree queue, which is better than mine. (But my goal was specifically to do it with std::* only. I suppose I could implement the lockfree queue myself, but that sounds hard and error prone.) Also, vit-vit's doesn't have an associated .cpp, which is simpler to use for people who don't know what they're doing. (E.g. github.com/Tyler-Hardin/thread_pool/issues/1)Godspeed
He/she also has a stl-only solution that I've been forking for the last few hours, at first it looked more complicated than yours with shared pointers all over the place, but this is actually needed to handle hot-resizing properly.Scanties
@Sh3ljohn, ah, I didn't notice the hot resizing. That's nice. I chose not to worry about it because it's not really within the intended use case. (I can't think of a case where I'd want to resize, personally, but that could be due to a lack of imagination.)Godspeed
Example use-case: you're running a RESTful API on a server and need to reduce temporarily the resource allocation for maintenance purposes, without needing to shutdown the service completely.Scanties
S
4

Something like this might help (taken from a working app).

#include <memory>
#include <boost/asio.hpp>
#include <boost/thread.hpp>

struct thread_pool {
  typedef std::unique_ptr<boost::asio::io_service::work> asio_worker;

  thread_pool(int threads) :service(), service_worker(new asio_worker::element_type(service)) {
    for (int i = 0; i < threads; ++i) {
      auto worker = [this] { return service.run(); };
      grp.add_thread(new boost::thread(worker));
    }
  }

  template<class F>
  void enqueue(F f) {
    service.post(f);
  }

  ~thread_pool() {
    service_worker.reset();
    grp.join_all();
    service.stop();
  }

private:
  boost::asio::io_service service;
  asio_worker service_worker;
  boost::thread_group grp;
};

You can use it like this:

thread_pool pool(2);

pool.enqueue([] {
  std::cout << "Hello from Task 1\n";
});

pool.enqueue([] {
  std::cout << "Hello from Task 2\n";
});

Keep in mind that reinventing an efficient asynchronous queuing mechanism is not trivial.

Boost::asio::io_service is a very efficient implementation, or actually is a collection of platform-specific wrappers (e.g. it wraps I/O completion ports on Windows).

Severalty answered 27/1, 2016 at 13:8 Comment(2)
Is that much boost necessary with C++11? Wouldn't, say, an std::thread suffice?Reorganization
There is no equivalent in std for boost::thread_group. boost::thread_group is a collection of boost::thread instances. But of course, it's very easy to replace boost::thread_group with a vector of std::threads.Severalty
I
1

looks like threadpool is very popular problem/exercise :-)

I recently wrote one in modern C++; it’s owned by me and publicly available here - https://github.com/yurir-dev/threadpool

It supports templated return values, core pinning, ordering of some tasks. all implementation in two .h files.

So, the original question will be something like this:

#include "tp/threadpool.h"

int arr[5] = { 0 };

concurency::threadPool<void> tp;
tp.start(std::thread::hardware_concurrency());

std::vector<std::future<void>> futures;
for (int i = 0; i < 8; ++i) { // for 8 iterations,
    for (int j = 0; j < 4; ++j) {
        futures.push_back(tp.push([&arr, j]() {
               arr[j] += 2;
            }));
    }
}

// wait until all pushed tasks are finished.
for (auto& f : futures)
    f.get();
// or just tp.end(); // will kill all the threads

arr[4] = *std::min_element(arr, arr + 4);
Incombustible answered 14/1, 2022 at 23:13 Comment(7)
Please be sure to read Stack Overflow's self-promotion policy when referencing your own content.Destitution
@JeremyCaney What's the problem with that? He's not selling anything, just showing his publically available FOSS library.Endocentric
@original.roland: If you have questions on the self-promotion rules, I recommend bringing them up on Meta Stack Exchange.Destitution
@JeremyCaney I don't have questions on the self-promoting rules, I am totally fine with that, just can't see how this answer would break any of the rules. Or did you just intend to randomly remember yurir to read the policy?Endocentric
@original.roland: At minimum, when referencing their own content, they should acknowledge that it's their own content. It's not a particularly big deal in this case, and an easy fix, which is why I reminded them of the policy without e.g. flagging the answer. My assumption is simply that they weren't aware of the policy. They should, however, edit their answer to acknowledge that they are the owner of the linked repository.Destitution
@JeremyCaney Thank you for explaining :)Endocentric
@JeremyCaney: Thank you for explanation. At first, I did not understand what was the problem. I edited the answer, hope now it fits the rules.Incombustible
A
1

You can use the single header library task-thread-pool, then your code becomes:

#include <algorithm> // for std::min_element
#include <task_thread_pool.hpp>

int main () {
    task_thread_pool::task_thread_pool pool;

    int arr[5] = {0};  // not arr[4] because original code had an out-of-bounds error accessing arr[4].

    for (int i = 0; i < 8; ++i) { // for 8 iterations,
        for (int j = 0; j < 4; ++j) {
            pool.submit_detach([&arr, j]{ arr[j] += 2; });
        }

        // Wait for all tasks to complete.
        // Could also use submit() which returns a future,
        // but then we'd have to call get() on all those futures.
        pool.wait_for_tasks();

        arr[4] = *std::min_element(arr, arr + 4);
    }

    return 0;
}

This creates and reuses threads until all tasks are complete.

Works on C++11 and newer.

Airtight answered 22/5, 2023 at 20:52 Comment(0)
A
0

I found the pending tasks' future.get() call hangs on caller side if the thread pool gets terminated and leaves some tasks inside task queue. How to set future exception inside thread pool with only the wrapper std::function?

template <class F, class... Args>
std::future<std::result_of_t<F(Args...)>> enqueue(F &&f, Args &&...args) {
    auto task = std::make_shared<std::packaged_task<std::result_of_t<F(Args...)>()>>(
        std::bind(std::forward<F>(f), std::forward<Args>(args)...));
    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(_mutex);
        _tasks.push([task]() -> void { (*task)(); });
    }
    return res;
}

class StdThreadPool {
    std::vector<std::thread> _workers;
    std::priority_queue<TASK> _tasks;
    ...
}

struct TASK {
    //int _func_return_value;
    std::function<void()> _func;
    int priority;
    ...
}
Adlay answered 3/12, 2022 at 5:52 Comment(0)
F
0

The Stroika library has a threadpool implementation.

Stroika ThreadPool.h

ThreadPool p;
p.AddTask ([] () {doIt ();});

Stroika's thread library also supports cancelation (cooperative) - so that when the ThreadPool above goes out of scope - it cancels any running tasks (similar to c++20's jthread).

Freezedry answered 8/12, 2022 at 4:8 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.