Code for a simple thread pool in C# [closed]
Asked Answered
D

2

63

Looking for some sample code (C#) for a simple thread pool implementation.

I found one on codeproject, but the codebase was just huge and I don't need all that functionality.

This is more for educational purposes anyways.

Dwarf answered 12/1, 2009 at 15:0 Comment(3)
The short answer is that you shouldn't roll your own unless it's a learning exercise. If it is a learning exercise, you'll learn more by writing it on your own than by copying someone else's code. :)Toyatoyama
@Greg: Are there no circumstances where you might want a pool of threads that are independant of the existing standard ThreadPool ?Brigitte
@Anthony: Reading what's gone into the built-in threadpool in Joe Duffy's (and others') various postings, I'm reasonably confident that any threadpool I slapped together would be vastly weaker than the one that already exists.Toyatoyama
T
31

There is no need to implement your own, since it is not very hard to use the existing .NET implementation.

From ThreadPool Documentation:

using System;
using System.Threading;

public class Fibonacci
{
    public Fibonacci(int n, ManualResetEvent doneEvent)
    {
        _n = n;
        _doneEvent = doneEvent;
    }

    // Wrapper method for use with thread pool.
    public void ThreadPoolCallback(Object threadContext)
    {
        int threadIndex = (int)threadContext;
        Console.WriteLine("thread {0} started...", threadIndex);
        _fibOfN = Calculate(_n);
        Console.WriteLine("thread {0} result calculated...", threadIndex);
        _doneEvent.Set();
    }

    // Recursive method that calculates the Nth Fibonacci number.
    public int Calculate(int n)
    {
        if (n <= 1)
        {
            return n;
        }

        return Calculate(n - 1) + Calculate(n - 2);
    }

    public int N { get { return _n; } }
    private int _n;

    public int FibOfN { get { return _fibOfN; } }
    private int _fibOfN;

    private ManualResetEvent _doneEvent;
}

public class ThreadPoolExample
{
    static void Main()
    {
        const int FibonacciCalculations = 10;

        // One event is used for each Fibonacci object
        ManualResetEvent[] doneEvents = new ManualResetEvent[FibonacciCalculations];
        Fibonacci[] fibArray = new Fibonacci[FibonacciCalculations];
        Random r = new Random();

        // Configure and launch threads using ThreadPool:
        Console.WriteLine("launching {0} tasks...", FibonacciCalculations);
        for (int i = 0; i < FibonacciCalculations; i++)
        {
            doneEvents[i] = new ManualResetEvent(false);
            Fibonacci f = new Fibonacci(r.Next(20,40), doneEvents[i]);
            fibArray[i] = f;
            ThreadPool.QueueUserWorkItem(f.ThreadPoolCallback, i);
        }

        // Wait for all threads in pool to calculation...
        WaitHandle.WaitAll(doneEvents);
        Console.WriteLine("All calculations are complete.");

        // Display the results...
        for (int i= 0; i<FibonacciCalculations; i++)
        {
            Fibonacci f = fibArray[i];
            Console.WriteLine("Fibonacci({0}) = {1}", f.N, f.FibOfN);
        }
    }
}
Tauromachy answered 12/1, 2009 at 15:7 Comment(5)
thread pool has huge limitationsCosecant
one pool per app domain, can't try abort waiting thread etc. There are heaps of info out there stackoverflow.com/questions/145304 codeproject.com/KB/threads/smartthreadpool.aspx codeproject.com/KB/threads/cancellablethreadpool.aspxCosecant
@Jeffrey: Where are those limitations brought up by the OP? Where in the OP do you see any evidence that the OP needs to roll his own thread pool?Tauromachy
@Tauromachy Limitations do not need to be brought up in order for them to exist. Furthermore, the OP was explicitly speaking about 'educational purposes'. I for one needed a dedicated ThreadPool, for purposes of benchmarking, so that no other code would interfere with it and that I could adjust it without affecting other existing code in libraries I use, that might make use of .net's builtin one. But even if I hadn't had the need for a custom pool, I might still just be curious how one could be implemented. In other words, I see plenty reasons why someone might be interested in a custom impl.Aluminothermy
@EugeneBeresovksy: You might want to actually read the OP, read the answer and then the comments. Perhaps check the OP's history as well...Tauromachy
H
53

This is the simplest, naive, thread-pool implementation for educational purposes I could come up with (C# / .NET 3.5). It is not using the .NET's thread pool implementation in any way.

using System;
using System.Collections.Generic;
using System.Threading;

namespace SimpleThreadPool
{
    public sealed class Pool : IDisposable
    {
        public Pool(int size)
        {
            this._workers = new LinkedList<Thread>();
            for (var i = 0; i < size; ++i)
            {
                var worker = new Thread(this.Worker) { Name = string.Concat("Worker ", i) };
                worker.Start();
                this._workers.AddLast(worker);
            }
        }

        public void Dispose()
        {
            var waitForThreads = false;
            lock (this._tasks)
            {
                if (!this._disposed)
                {
                    GC.SuppressFinalize(this);

                    this._disallowAdd = true; // wait for all tasks to finish processing while not allowing any more new tasks
                    while (this._tasks.Count > 0)
                    {
                        Monitor.Wait(this._tasks);
                    }

                    this._disposed = true;
                    Monitor.PulseAll(this._tasks); // wake all workers (none of them will be active at this point; disposed flag will cause then to finish so that we can join them)
                    waitForThreads = true;
                }
            }
            if (waitForThreads)
            {
                foreach (var worker in this._workers)
                {
                    worker.Join();
                }
            }
        }

        public void QueueTask(Action task)
        {
            lock (this._tasks)
            {
                if (this._disallowAdd) { throw new InvalidOperationException("This Pool instance is in the process of being disposed, can't add anymore"); }
                if (this._disposed) { throw new ObjectDisposedException("This Pool instance has already been disposed"); }
                this._tasks.AddLast(task);
                Monitor.PulseAll(this._tasks); // pulse because tasks count changed
            }
        }

        private void Worker()
        {
            Action task = null;
            while (true) // loop until threadpool is disposed
            {
                lock (this._tasks) // finding a task needs to be atomic
                {
                    while (true) // wait for our turn in _workers queue and an available task
                    {
                        if (this._disposed)
                        {
                            return;
                        }
                        if (null != this._workers.First && object.ReferenceEquals(Thread.CurrentThread, this._workers.First.Value) && this._tasks.Count > 0) // we can only claim a task if its our turn (this worker thread is the first entry in _worker queue) and there is a task available
                        {
                            task = this._tasks.First.Value;
                            this._tasks.RemoveFirst();
                            this._workers.RemoveFirst();
                            Monitor.PulseAll(this._tasks); // pulse because current (First) worker changed (so that next available sleeping worker will pick up its task)
                            break; // we found a task to process, break out from the above 'while (true)' loop
                        }
                        Monitor.Wait(this._tasks); // go to sleep, either not our turn or no task to process
                    }
                }

                task(); // process the found task
                lock(this._tasks)
                {
                    this._workers.AddLast(Thread.CurrentThread);
                }
                task = null;
            }
        }

        private readonly LinkedList<Thread> _workers; // queue of worker threads ready to process actions
        private readonly LinkedList<Action> _tasks = new LinkedList<Action>(); // actions to be processed by worker threads
        private bool _disallowAdd; // set to true when disposing queue but there are still tasks pending
        private bool _disposed; // set to true when disposing queue and no more tasks are pending
    }


    public static class Program
    {
        static void Main()
        {
            using (var pool = new Pool(5))
            {
                var random = new Random();
                Action<int> randomizer = (index =>
                {
                    Console.WriteLine("{0}: Working on index {1}", Thread.CurrentThread.Name, index);
                    Thread.Sleep(random.Next(20, 400));
                    Console.WriteLine("{0}: Ending {1}", Thread.CurrentThread.Name, index);
                });

                for (var i = 0; i < 40; ++i)
                {
                    var i1 = i;
                    pool.QueueTask(() => randomizer(i1));
                }
            }
        }
    }
}
Hardback answered 12/1, 2009 at 18:55 Comment(6)
+1 Thank you. I was using this snippet but after an extremely long period of time, I encountered an error: Unhandled Exception: System.NullReferenceException: Object reference not set to an instance of an object. at System.Collections.Generic.LinkedList'1.InternalInsertNodeBefore(LinkedListNode>' node, LinkedListNode'1 newNode) at System.Collections.Generic.LinkedList'1.AddLast(T value) at Prog.Pool.Worker()`. Any idea what is causing this?Anastasiaanastasie
@Anastasiaanastasie not sure what the problem might be, but if I had to guess I'd say it is related to the fact that _workers linked list is being accessed outside of lock. If using .NET 4, you could try using ConcurrentQueue<Action> instead.Hardback
+1 Thank you. You are right. I asked a question here: #16764126 It seems that the problem was indeed due to the missing lock. Thanks for your time. I'm currently using .NET 3.5 and this works like a charm.Anastasiaanastasie
Your code is awesome, but I noticed that it seems that dispose does not wait the threads to end correctly. Try add the following lines in the main after the pool is disposed: Console.WriteLine("Thread pool disposed!"); Thread.Sleep(2000);. Then place a breakpoint after that line (just to avoid the console to close). You'll notice that after Thread pool disposed! the workers will continue to write their operations (like Worker 0: Ending 39)Angeliqueangelis
To me the problem could be the fact that if a Worker is running an Action it isn't in the List this._workers, so, the Dispose method (in foreach (var worker in this._workers)) will not wait it to endAngeliqueangelis
A solution could be to have another list( that will never be "touched" by the methods) containing all the threads. Then in the Dispose method we'll foreach on that listAngeliqueangelis
T
31

There is no need to implement your own, since it is not very hard to use the existing .NET implementation.

From ThreadPool Documentation:

using System;
using System.Threading;

public class Fibonacci
{
    public Fibonacci(int n, ManualResetEvent doneEvent)
    {
        _n = n;
        _doneEvent = doneEvent;
    }

    // Wrapper method for use with thread pool.
    public void ThreadPoolCallback(Object threadContext)
    {
        int threadIndex = (int)threadContext;
        Console.WriteLine("thread {0} started...", threadIndex);
        _fibOfN = Calculate(_n);
        Console.WriteLine("thread {0} result calculated...", threadIndex);
        _doneEvent.Set();
    }

    // Recursive method that calculates the Nth Fibonacci number.
    public int Calculate(int n)
    {
        if (n <= 1)
        {
            return n;
        }

        return Calculate(n - 1) + Calculate(n - 2);
    }

    public int N { get { return _n; } }
    private int _n;

    public int FibOfN { get { return _fibOfN; } }
    private int _fibOfN;

    private ManualResetEvent _doneEvent;
}

public class ThreadPoolExample
{
    static void Main()
    {
        const int FibonacciCalculations = 10;

        // One event is used for each Fibonacci object
        ManualResetEvent[] doneEvents = new ManualResetEvent[FibonacciCalculations];
        Fibonacci[] fibArray = new Fibonacci[FibonacciCalculations];
        Random r = new Random();

        // Configure and launch threads using ThreadPool:
        Console.WriteLine("launching {0} tasks...", FibonacciCalculations);
        for (int i = 0; i < FibonacciCalculations; i++)
        {
            doneEvents[i] = new ManualResetEvent(false);
            Fibonacci f = new Fibonacci(r.Next(20,40), doneEvents[i]);
            fibArray[i] = f;
            ThreadPool.QueueUserWorkItem(f.ThreadPoolCallback, i);
        }

        // Wait for all threads in pool to calculation...
        WaitHandle.WaitAll(doneEvents);
        Console.WriteLine("All calculations are complete.");

        // Display the results...
        for (int i= 0; i<FibonacciCalculations; i++)
        {
            Fibonacci f = fibArray[i];
            Console.WriteLine("Fibonacci({0}) = {1}", f.N, f.FibOfN);
        }
    }
}
Tauromachy answered 12/1, 2009 at 15:7 Comment(5)
thread pool has huge limitationsCosecant
one pool per app domain, can't try abort waiting thread etc. There are heaps of info out there stackoverflow.com/questions/145304 codeproject.com/KB/threads/smartthreadpool.aspx codeproject.com/KB/threads/cancellablethreadpool.aspxCosecant
@Jeffrey: Where are those limitations brought up by the OP? Where in the OP do you see any evidence that the OP needs to roll his own thread pool?Tauromachy
@Tauromachy Limitations do not need to be brought up in order for them to exist. Furthermore, the OP was explicitly speaking about 'educational purposes'. I for one needed a dedicated ThreadPool, for purposes of benchmarking, so that no other code would interfere with it and that I could adjust it without affecting other existing code in libraries I use, that might make use of .net's builtin one. But even if I hadn't had the need for a custom pool, I might still just be curious how one could be implemented. In other words, I see plenty reasons why someone might be interested in a custom impl.Aluminothermy
@EugeneBeresovksy: You might want to actually read the OP, read the answer and then the comments. Perhaps check the OP's history as well...Tauromachy

© 2022 - 2024 — McMap. All rights reserved.