Managing threads while practicing modern c++17's best practices
Asked Answered
D

2

5

Originally I had thought about designing a ThreadManager class to store threads along with the data type objects and function type objects that they would work with. The class was to be responsible for the managing of memory, access, transferring, releasing, locking, unlocking, joining, and other typical common functionalities of the associated types within the standard multithreading library. It was originally intended to associate the containing thread and its id with a specific set of resources that a particular thread has access to.

After reading through the docs on cppreference about mutex, shared_mutex, lock_guard, shared_lock, std::function<...>, etc. and now knowing that mutexes and lock_guards are non copyable and the fact that if I template the class to store arbitrary function objects, function pointers, lambdas or std::function<>s as a std::function<> within this class's container that the class instantiation of this intended singleton would only be able to store a specific function signature limiting it to not be able to instantiate any other declaration signature.

Concerning these behaviors and properties of the multithreading library within the standard library for mutexes, shared_mutexes lock_guards, threads, promises, futures etc... it came to mind that I'm overthinking the overall design of this class.

You can refer to my initial design attempt via this previously asked question of mine. Storing arbitrary function objects into a class member container without knowing their declaration signature, This should give you an idea of what I was attempting to do.


Understanding a little more about their behaviors, properties and responsibilities I would like to know if the following would be appropriate for the intended design process.

Instead of storing any mutexes, lock_guards, threads, data type objects or function objects; would it make more sense to just store the generated ids of the threads, and have my manager class act more like a monitoring, recording, and reporting type class instead?

My new intentions would be that the container would store the thread's ID in an associated map as its key along with an associated common struct. The struct would contain a property list of all the responsibilities and actions of the combined resources. This may then allow support for some of the following features: a priority queue, a task scheduler, a dispatcher of commands to send and fetch resources knowing if the thread is available or not, where these types of actions will not be done by this class directly but through generic function templates.

For example:

struct ThreadProperties {
    // thread specific
    id,
    slept for,
    is locked,
    is waiting,
    is joined,
    has mutex, if so hold id to mutex - lockguard
    is shared,
    is active,
    has promise or future...
    who has ownership of,
    marked for release,
    marked for transfer

    // other mutex and lock_guard properties

    // function object address stored as `size_t` to represent an id
    // data object address stored as `size_t` to represent an id

    // etc.

};

class ThreadManager {
private:
    std::map<unsigned, ThreadProperites> threadTable;
public:
    default constructor

    storeIds into relevant containers

    store properties into relevant containers

    associate above containers into a map or lookup table

    find or look for specific ID and if found 
    check to see its current status and report it
    also check to see if it's in a priority queue 
    or task scheduler and determine if it is ready
    to do something, or change its internal state.

    other common methods of functionality associated with threads.
};

// function templates to act on threads according to the reporting of the manager above.

With my intention to implement this kind of design while trying to maintain the best practices of modern c++ targeting c++17; would this kind of design be appropriate for proper class design to be generic, modular, portable and efficient for use?

Decolorize answered 22/3, 2018 at 23:13 Comment(1)
You might want to take a look at golang's goroutine(coroutine). Not difficult to implement it in C++. IMO, it's the easiest model for user to use, but the runtime system is very intrusive(user stack).Philipps
T
3

Threads and other std primitives are like raw pointers. You should build up a concurrency model that doesn't expose anything that low level. The std threading primitives provide you with enough tools to do so.

Learn about some of the new stuff headed down the pipe -- executors, streams, coroutines, rangesv3, monadic futures, etc. Model your library around that.

Trying to make well behaved code based around raw use of mutexes, threads going to sleep and waking up, blocking, atomics and shared data is a trap.

As an example:

struct thread_pool;
template<class T>
struct my_future:std::future<T> {
  template<class F>
  auto then( F&& f )&&
  -> std::future< std::result_of_t<F(T&&)> >;
  thread_pool* executor = 0;
};
template<>
struct my_future<void>:std::future<void> {
  template<class F>
  auto then( F&& f )&&
  -> std::future< std::result_of_t<F()> >;
  thread_pool* executor = 0;
};
struct thread_pool {
  template<class F=do_nothing>
  my_future<std::result_of_t<F()>> do_task(F&& f={});
};

here we talk about piping data from task to task to task and ending in an augmented future<T>. Augment it with the ability to split (via shared_future) and merge (future<X> joined with future<Y> to produce future<X, Y>).

Maybe go a step further and build a stream based system:

template<class In>
using sink = std::function<void(In)>;
template<class Out>
using source = std::function<sink<Out>>;
template<class In, class Out>
using pipe = std::function< source<In>, sink<Out> >;

and then support turning a source into an async source.

And instead of building up some huge castle of abstraction and hoping it is complete, read about these things, and when you run into a problem that one of these things solve implement just enough to solve your problem. You aren't writing the be-all end-all thread system from scratch the first time you try, so don't try. Write something useful, and then write a better one next time.

Telephoto answered 23/3, 2018 at 20:9 Comment(4)
I appreciate your suggestions, and a visual example with a good solid explanation of the pitfalls involved. This does sound very reasonable and should be suitable for my needs.Decolorize
I added my own answer based on doing a lot of research and I think I found something that might fit my needs.Decolorize
@FrancisCugler Well, here is one I wrote a year ago. Note I split the queue of tasks from the threading engine (I find thread safe queues useful without threading engines). I think the last one I wrote (not certain where) I added a method to "clear" the queue atomically, and used empty-function-objects to signal "terminate thread". So stop was "clear queue, fill with terminate thread tokens, return future that indicated all threads had terminated".Telephoto
Very nice and informative; nicely structured with a solid explanation. It is quite different from my older library that used window's CRITICAL_SECTIONS...Decolorize
D
3

After taking user Yakk's advice and doing some more research into the behavior of mutex, lock_guard, thread, etc. I found this video on https://www.youtube.com on a ThreadPool class.

Here is a working piece of code:

ThreadPool.h

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <vector>
#include <queue>
#include <functional>
#include <condition_variable>
#include <thread>
#include <future>

namespace linx {

class ThreadPool final {
public:
    using Task = std::function<void()>;

private:    
    std::vector<std::thread> _threads;
    std::queue<Task>         _tasks;

    std::condition_variable  _event;
    std::mutex               _eventMutex;
    bool                     _stopping = false;    

public:
    explicit ThreadPool( std::size_t numThreads ) {
        start( numThreads );
    }    

    ~ThreadPool() {
        stop();
    }

    ThreadPool( const ThreadPool& c ) = delete;
    ThreadPool& operator=( const ThreadPool& c ) = delete;

    template<class T>
    auto enqueue( T task )->std::future<decltype(task())> {
        auto wrapper = std::make_shared<std::packaged_task<decltype(task()) ()>>( std::move( task ) );

        {
            std::unique_lock<std::mutex> lock( _eventMutex );
            _tasks.emplace( [=] {
                (*wrapper)();
            } );
        }

        _event.notify_one();
        return wrapper->get_future();
    }

private:
    void start( std::size_t numThreads ) {
        for( auto i = 0u; i < numThreads; ++i ) {
            _threads.emplace_back( [=] {
                while( true ) {
                    Task task;

                    {
                        std::unique_lock<std::mutex> lock{ _eventMutex };
                        _event.wait( lock, [=] { return _stopping || !_tasks.empty(); } );

                        if( _stopping && _tasks.empty() )
                            break;

                        task = std::move( _tasks.front() );
                        _tasks.pop();
                    }

                    task();
                }
            } );
        }
    }

    void stop() noexcept {
        {
            std::unique_lock<std::mutex> lock{ _eventMutex };
            _stopping = true;
        }

        _event.notify_all();

        for( auto& thread : _threads )
            thread.join();

    }
};

} // namespace linx

#endif // !THREAD_POOL_H

main.cpp

#include <iostream>
#include <sstream>
#include "ThreadPool.h"

int main() {
    {
        ThreadPool pool{ 4 }; // 4 threads

        auto f1 = pool.enqueue( [] {
            return 2;
        } );

        auto f2 = pool.enqueue( [] {
            return 4;
        } );
         
        auto a = f1.get();
        auto b = f2.get();

        auto f3 = pool.enqueue( [&] {
            return a + b;
        } );

        auto f4 = pool.enqueue( [&] {
           return a * b;
        } );

        std::cout << "f1 = " << a << '\n' <<
                  << "f2 = " << b << '\n' <<
                  << "f3 = " << f3.get() << '\n' <<
                  << "f4 = " << f4.get() << '\n';
    }                 

    std::cout << "\nPress any key and enter to quit.\n";
    std::cin.get();
    return 0;
}

I think this is something that might serve my purposes. I did a basic simple example here, but in my own IDE using a couple of my other classes I wrapped my Execution Timer around my thread pool object that had 4 lambdas like above, except that the first lambda used my other class to generate 1 million random integer values using mt19937 seeding it with a random device between [1,1000], my second lambda did the same as above except it used mt19937 seeding it with chrono::high_resolution_clock for floating point numbers from (0, 1.0). The third and forth lambdas, following the pattern above took the results and saved them to an ostringstream and I returned back that stream. Then I printed the results. The time execution on my pc: Intel Quad Core Extreme 3.0Ghz, 8GB ram, running Win 7 64bit home premium took about 1720 milliseconds to generate a million random values for each case using 4 threads, and my pc was utilizing all four cores.

Decolorize answered 26/3, 2018 at 7:13 Comment(4)
I'm trying to figure out whether this handles nesting. In my case I have a collection of tasks of quite different 'sizes'. Part of each task can again be divided into subtasks, threading of which would greatly help for the large ones one ends up waiting for...Rae
Re nested: I'm thinking of adding a borrow (and return) method/code, borrow succeeds if available, so the nested multithreaded uses its own ThreadPool (the borrower), which is automatically destroyed on inner-loop end, and then should return. This gives flexibility without starting too many threads, I imagine.Rae
3 years passed but I want to noted, it is giving error when building in f4 definition line auto f4 = pool.enqueue( [] { return a * b; } ); then I added & between brackets like as f3 definition then error gone.Abdias
@OrkunKasapoglu That might have been a typo. It's been a while since I posted this and I'm not even sure if I still have the original source. Nice catch!Decolorize

© 2022 - 2024 — McMap. All rights reserved.