Is it possible always to force a new thread with Task?
Asked Answered
C

6

15

I am trying to create a new thread each time Task.Factory.StartNew is called. The question is how to run the code bellow without throwing the exception:

static void Main(string[] args)
{
    int firstThreadId = 0;

    Task.Factory.StartNew(() => firstThreadId = Thread.CurrentThread.ManagedThreadId);

    for (int i = 0; i < 100; i++)
    {
        Task.Factory.StartNew(() =>
        {
            while (true)
            {
                Thread.Sleep(1000);
                if (firstThreadId == Thread.CurrentThread.ManagedThreadId)
                    throw new Exception("The first thread is reused.");
            }
        });
    }
    Console.Read();
}

EDIT: the new code if you comment the first for statement there is no problem. But if you have it, WOW, the message "Thread reused" is written to the console. Can you explain that because I am really confused.

static void Main(string[] args)
{
    ConcurrentDictionary<int, int> startedThreads = new ConcurrentDictionary<int, int>();

    for (int i = 0; i < 10; i++)
    {
        Task.Factory.StartNew(() =>
        {
            Task.Factory.StartNew(() =>
            {
                startedThreads.AddOrUpdate(Thread.CurrentThread.ManagedThreadId,
                    Thread.CurrentThread.ManagedThreadId, (a, b) => b);
            }, TaskCreationOptions.LongRunning);

            for (int j = 0; j < 100; j++)
            {
                Task.Factory.StartNew(() =>
                {
                    while (true)
                    {
                        Thread.Sleep(10);
                        if (startedThreads.ContainsKey(
                            Thread.CurrentThread.ManagedThreadId))
                                Console.WriteLine("Thread reused");
                    }
                }, TaskCreationOptions.LongRunning);
            }
        });
    }

    Console.Read();
}
Caporal answered 26/11, 2012 at 18:5 Comment(7)
The whole point of the Task library is that you don't have to work directly with threads. If you really need to work directly with threads, then don't use Tasks.Elinaelinor
Do you have to use the TPL? If your real code is structured like the example, you could replace the calls to Task.Factory.StartNew(...) with simply new Thread(...).Start(), and you would guarantee that a thread is actually created for each "task".Elaina
@KeithS: Tasks are much easier to work with than threads.Greenland
The code is really complex. The current code snippet is just a small part.Caporal
@Greenland Well, tasks exist so you don't have to work with threads. However if you want to work with threads, you don't need the abstraction provided by a task (in fact it gets in the way), and therefore I'd posit it would be easier for the OP to work with a thread and not a task, given what he says he needs.Elaina
@KeithS: No; I think the OP wants Task's nicer API & composability, but needs each callback to run on a separate thread.Greenland
Then Delegate.BeginInvoke() is the ticket. Still lets the ThreadPool handle it, and you get a callback when it's done, but you guarantee a thread per invocation.Elaina
C
3

Hello and thank you all for the answers. You all got +1. All suggested solution did not work for my case. The problem is that when you sleep a thread it will be reused at some point of time. The people above suggested:

  • using LongRunning => This will not work if you have nested/child tasks
  • custom task scheduler => I tried to write my own and also tried this ThreadPerTaskScheduler which also di not work.
  • using pure threads => Still failing...
  • you could also check this project at Multithreading.Scheduler github

My solution

I don't like it but it works. Basically I block the thread so it cannot be reused. Bellow are the extension methods and a working example. Again, thank you.

https://gist.github.com/4150635

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    public static class ThreadExtensions
    {
        /// <summary>
        /// Blocks the current thread for a period of time so that the thread cannot be reused by the threadpool.
        /// </summary>
        public static void Block(this Thread thread, int millisecondsTimeout)
        {
            new WakeSleepClass(millisecondsTimeout).SleepThread();
        }

        /// <summary>
        /// Blocks the current thread so that the thread cannot be reused by the threadpool.
        /// </summary>
        public static void Block(this Thread thread)
        {
            new WakeSleepClass().SleepThread();
        }

        /// <summary>
        /// Blocks the current thread for a period of time so that the thread cannot be reused by the threadpool.
        /// </summary>
        public static void Block(this Thread thread, TimeSpan timeout)
        {
            new WakeSleepClass(timeout).SleepThread();
        }

        class WakeSleepClass
        {
            bool locked = true;
            readonly TimerDisposer timerDisposer = new TimerDisposer();

            public WakeSleepClass(int sleepTime)
            {
                var timer = new Timer(WakeThread, timerDisposer, sleepTime, sleepTime);
                timerDisposer.InternalTimer = timer;
            }

            public WakeSleepClass(TimeSpan sleepTime)
            {
                var timer = new Timer(WakeThread, timerDisposer, sleepTime, sleepTime);
                timerDisposer.InternalTimer = timer;
            }

            public WakeSleepClass()
            {
                var timer = new Timer(WakeThread, timerDisposer, Timeout.Infinite, Timeout.Infinite);
                timerDisposer.InternalTimer = timer;
            }

            public void SleepThread()
            {
                while (locked)
                    lock (timerDisposer) Monitor.Wait(timerDisposer);
                locked = true;
            }

            public void WakeThread(object key)
            {
                locked = false;
                lock (key) Monitor.Pulse(key);
                ((TimerDisposer)key).InternalTimer.Dispose();
            }

            class TimerDisposer
            {
                public Timer InternalTimer { get; set; }
            }
        }
    }

    class Program
    {
        private static readonly Queue<CancellationTokenSource> tokenSourceQueue = new Queue<CancellationTokenSource>();
        static void Main(string[] args)
        {
            CancellationTokenSource tokenSource = new CancellationTokenSource();
            tokenSourceQueue.Enqueue(tokenSource);

            ConcurrentDictionary<int, int> startedThreads = new ConcurrentDictionary<int, int>();
            for (int i = 0; i < 10; i++)
            {
                Thread.Sleep(1000);
                Task.Factory.StartNew(() =>
                {
                    startedThreads.AddOrUpdate(Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.ManagedThreadId, (a, b) => b);
                    for (int j = 0; j < 50; j++)
                        Task.Factory.StartNew(() => startedThreads.AddOrUpdate(Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.ManagedThreadId, (a, b) => b));

                    for (int j = 0; j < 50; j++)
                    {
                        Task.Factory.StartNew(() =>
                        {
                            while (!tokenSource.Token.IsCancellationRequested)
                            {
                                if (startedThreads.ContainsKey(Thread.CurrentThread.ManagedThreadId)) Console.WriteLine("Thread reused");
                                Thread.CurrentThread.Block(10);
                                if (startedThreads.ContainsKey(Thread.CurrentThread.ManagedThreadId)) Console.WriteLine("Thread reused");
                            }
                        }, tokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default)
                        .ContinueWith(task =>
                        {
                            WriteExceptions(task.Exception);
                            Console.WriteLine("-----------------------------");
                        }, TaskContinuationOptions.OnlyOnFaulted);
                    }
                    Thread.CurrentThread.Block();
                }, tokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default)
                .ContinueWith(task =>
                {
                    WriteExceptions(task.Exception);
                    Console.WriteLine("-----------------------------");
                }, TaskContinuationOptions.OnlyOnFaulted);
            }

            Console.Read();
        }

        private static void WriteExceptions(Exception ex)
        {
            Console.WriteLine(ex.Message);
            if (ex.InnerException != null)
                WriteExceptions(ex.InnerException);
        }
    }
}
Caporal answered 28/11, 2012 at 8:29 Comment(0)
D
26

If you specify TaskCreationOptions.LongRunning when starting the task, that provides a hint to the scheduler, which the default scheduler takes as an indicator to create a new thread for the task.

It's only a hint - I'm not sure I'd rely on it... but I haven't seen any counterexamples using the default scheduler.

Destructor answered 26/11, 2012 at 18:8 Comment(3)
10x, it is working with TaskCreationOptions.LongRunning for the current example. I will try to reproduce the problem where even that wont help. brbCaporal
@mynkow: You've updated it, but not really described what you're seeing. Apparently something's wrong, but we don't know what...Destructor
10x, the message "Thread reused" is written to the console. Updated.Caporal
G
7

Adding to Jon Skeet's answer, if you want to guarantee that a new thread is created every time, you can write your own TaskScheduler that creates a new thread.

Greenland answered 26/11, 2012 at 18:9 Comment(0)
Y
4

Try this:

var taskCompletionSource = new TaskCompletionSource<bool>();
Thread t = new Thread(() =>
{
    try
    {
        Operation();
        taskCompletionSource.TrySetResult(true);
    }
    catch (Exception e)
    {
        taskCompletionSource.TrySetException(e);
    }
});
void Operation()
{
    // Some work in thread
}
t.Start();
await taskCompletionSource.Task;

You also can write extension methods for Action, Func and so on. For example:

public static Task RunInThread(
    this Action action,
    Action<Thread> initThreadAction = null)
{
    TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>();

    Thread thread = new Thread(() =>
    {
        try
        {
            action();
            taskCompletionSource.TrySetResult(true);
        }
        catch (Exception e)
        {
            taskCompletionSource.TrySetException(e);
        }
    });
    initThreadAction?.Invoke(thread);
    thread.Start();

    return taskCompletionSource.Task;
}

or

public static Task<TResult> RunInThread<T1, T2, TResult>(
    this Func<T1, T2, TResult> function,
    T1 param1,
    T2 param2,
    Action<Thread> initThreadAction = null)
{
    TaskCompletionSource<TResult> taskCompletionSource = new TaskCompletionSource<TResult>();

    Thread thread = new Thread(() =>
    {
        try
        {
            TResult result = function(param1, param2);
            taskCompletionSource.TrySetResult(result);
        }
        catch (Exception e)
        {
            taskCompletionSource.TrySetException(e);
        }
    });
    initThreadAction?.Invoke(thread);
    thread.Start();

    return taskCompletionSource.Task;
}

and use it like that:

var result = await some_function.RunInThread(param1, param2).ConfigureAwait(true);
Yenyenisei answered 24/11, 2018 at 16:15 Comment(0)
C
3

Hello and thank you all for the answers. You all got +1. All suggested solution did not work for my case. The problem is that when you sleep a thread it will be reused at some point of time. The people above suggested:

  • using LongRunning => This will not work if you have nested/child tasks
  • custom task scheduler => I tried to write my own and also tried this ThreadPerTaskScheduler which also di not work.
  • using pure threads => Still failing...
  • you could also check this project at Multithreading.Scheduler github

My solution

I don't like it but it works. Basically I block the thread so it cannot be reused. Bellow are the extension methods and a working example. Again, thank you.

https://gist.github.com/4150635

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    public static class ThreadExtensions
    {
        /// <summary>
        /// Blocks the current thread for a period of time so that the thread cannot be reused by the threadpool.
        /// </summary>
        public static void Block(this Thread thread, int millisecondsTimeout)
        {
            new WakeSleepClass(millisecondsTimeout).SleepThread();
        }

        /// <summary>
        /// Blocks the current thread so that the thread cannot be reused by the threadpool.
        /// </summary>
        public static void Block(this Thread thread)
        {
            new WakeSleepClass().SleepThread();
        }

        /// <summary>
        /// Blocks the current thread for a period of time so that the thread cannot be reused by the threadpool.
        /// </summary>
        public static void Block(this Thread thread, TimeSpan timeout)
        {
            new WakeSleepClass(timeout).SleepThread();
        }

        class WakeSleepClass
        {
            bool locked = true;
            readonly TimerDisposer timerDisposer = new TimerDisposer();

            public WakeSleepClass(int sleepTime)
            {
                var timer = new Timer(WakeThread, timerDisposer, sleepTime, sleepTime);
                timerDisposer.InternalTimer = timer;
            }

            public WakeSleepClass(TimeSpan sleepTime)
            {
                var timer = new Timer(WakeThread, timerDisposer, sleepTime, sleepTime);
                timerDisposer.InternalTimer = timer;
            }

            public WakeSleepClass()
            {
                var timer = new Timer(WakeThread, timerDisposer, Timeout.Infinite, Timeout.Infinite);
                timerDisposer.InternalTimer = timer;
            }

            public void SleepThread()
            {
                while (locked)
                    lock (timerDisposer) Monitor.Wait(timerDisposer);
                locked = true;
            }

            public void WakeThread(object key)
            {
                locked = false;
                lock (key) Monitor.Pulse(key);
                ((TimerDisposer)key).InternalTimer.Dispose();
            }

            class TimerDisposer
            {
                public Timer InternalTimer { get; set; }
            }
        }
    }

    class Program
    {
        private static readonly Queue<CancellationTokenSource> tokenSourceQueue = new Queue<CancellationTokenSource>();
        static void Main(string[] args)
        {
            CancellationTokenSource tokenSource = new CancellationTokenSource();
            tokenSourceQueue.Enqueue(tokenSource);

            ConcurrentDictionary<int, int> startedThreads = new ConcurrentDictionary<int, int>();
            for (int i = 0; i < 10; i++)
            {
                Thread.Sleep(1000);
                Task.Factory.StartNew(() =>
                {
                    startedThreads.AddOrUpdate(Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.ManagedThreadId, (a, b) => b);
                    for (int j = 0; j < 50; j++)
                        Task.Factory.StartNew(() => startedThreads.AddOrUpdate(Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.ManagedThreadId, (a, b) => b));

                    for (int j = 0; j < 50; j++)
                    {
                        Task.Factory.StartNew(() =>
                        {
                            while (!tokenSource.Token.IsCancellationRequested)
                            {
                                if (startedThreads.ContainsKey(Thread.CurrentThread.ManagedThreadId)) Console.WriteLine("Thread reused");
                                Thread.CurrentThread.Block(10);
                                if (startedThreads.ContainsKey(Thread.CurrentThread.ManagedThreadId)) Console.WriteLine("Thread reused");
                            }
                        }, tokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default)
                        .ContinueWith(task =>
                        {
                            WriteExceptions(task.Exception);
                            Console.WriteLine("-----------------------------");
                        }, TaskContinuationOptions.OnlyOnFaulted);
                    }
                    Thread.CurrentThread.Block();
                }, tokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default)
                .ContinueWith(task =>
                {
                    WriteExceptions(task.Exception);
                    Console.WriteLine("-----------------------------");
                }, TaskContinuationOptions.OnlyOnFaulted);
            }

            Console.Read();
        }

        private static void WriteExceptions(Exception ex)
        {
            Console.WriteLine(ex.Message);
            if (ex.InnerException != null)
                WriteExceptions(ex.InnerException);
        }
    }
}
Caporal answered 28/11, 2012 at 8:29 Comment(0)
E
2

Just start threads with new Thread() and then Start() them

static void Main(string[] args)
{
    ConcurrentDictionary<int, int> startedThreads = new ConcurrentDictionary<int, int>();

    for (int i = 0; i < 10; i++)
    {
        new Thread(() =>
        {
            new Thread(() =>
            {
                startedThreads.AddOrUpdate(Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.ManagedThreadId, (a, b) => b);
            }).Start();

            for (int j = 0; j < 100; j++)
            {
                new Thread(() =>
                {
                    while (true)
                    {
                        Thread.Sleep(10);
                        if (startedThreads.ContainsKey(Thread.CurrentThread.ManagedThreadId))
                            Console.WriteLine("Thread reused");
                    }
                }).Start();
            }
        }).Start();
    }

    Console.Read();

}

Tasks are supposed to be managed by the scheduler. The whole idea of Tasks is that the runtime will decide when a new thread is needed. On the other hand if you do need different threads chances are something else in the code is wrong like overdependency on Thread.Sleep() or thread local storage.

As pointed out you can create your own TaskScheduler and use tasks to create threads but then why use Tasks to begin with?

Enlarger answered 26/11, 2012 at 20:0 Comment(1)
I'd agree with this, except if the action running within the thread is an async Task.Oar
K
0

Here is a custom TaskScheduler that executes the tasks on a dedicated thread per task:

public class ThreadPerTask_TaskScheduler : TaskScheduler
{
    protected override void QueueTask(Task task)
    {
        var thread = new Thread(() => TryExecuteTask(task));
        thread.IsBackground = true;
        thread.Start();
    }

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        return TryExecuteTask(task);
    }

    protected override IEnumerable<Task> GetScheduledTasks() { yield break; }
}

Usage example:

var parallelOptions = new ParallelOptions()
{
    MaxDegreeOfParallelism = 3,
    TaskScheduler = new ThreadPerTask_TaskScheduler()
};

Parallel.ForEach(Enumerable.Range(1, 10), parallelOptions, item =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}" +
        $" [{Thread.CurrentThread.ManagedThreadId}]" +
        $" Processing #{item}" +
        (Thread.CurrentThread.IsBackground ? ", Background" : "") +
        (Thread.CurrentThread.IsThreadPoolThread ? ", ThreadPool" : ""));
    Thread.Sleep(1000); // Simulate CPU-bound work
});

Output:

20:38:56.770 [4] Processing #3, Background
20:38:56.770 [5] Processing #2, Background
20:38:56.770 [1] Processing #1
20:38:57.782 [1] Processing #4
20:38:57.783 [8] Processing #5, Background
20:38:57.783 [7] Processing #6, Background
20:38:58.783 [1] Processing #7
20:38:58.783 [10] Processing #8, Background
20:38:58.787 [9] Processing #9, Background
20:38:59.783 [1] Processing #10

Try it on Fiddle.

This custom TaskScheduler allows the current thread to participate in the computations too. This is demonstrated in the above example by the thread [1] processing the items #1, #4, #7 and #10. If you don't want this to happen, just replace the code inside the TryExecuteTaskInline with return false;.

Another example, featuring the Task.Factory.StartNew method. Starting 100 tasks on 100 different threads:

var oneThreadPerTask = new ThreadPerTask_TaskScheduler();
Task[] tasks = Enumerable.Range(1, 100).Select(_ =>
{
    return Task.Factory.StartNew(() =>
    {
        Thread.Sleep(1000); // Simulate long-running work
    }, default, TaskCreationOptions.None, oneThreadPerTask);
}).ToArray();

In this case the current thread is not participating in the work, because all tasks are started behind the scenes by invoking their Start method, and not the RunSynchronously.

Kresic answered 25/8, 2021 at 21:4 Comment(3)
Hi Theo, In the example output, #1, #4, #7 and #10 uses the same non-background thread, the one with ManagedThreadId = 1. Why don't all tasks use a separate background thread?Sempstress
@Sempstress that's because the Parallel.ForEach uses the current thread as a worker thread. Technically it runs one of the Tasks it creates with RunSynchronously instead of Start. This "special" task is scheduled on the ThreadPerTask_TaskScheduler with the TryExecuteTaskInline method instead of the QueueTask. The implementation of the TryExecuteTaskInline method is return TryExecuteTask(task);, which honors the request for inline execution. If you change the implementation to return false, all items will be processed on dedicated threads (demo).Kresic
@Sempstress actually on closer inspection the return false implementation does not result in a dedicated thread per item. The Parallel.ForEach method may start fewer tasks than the total number of items. TBH I don't have a perfect knowledge of the inner workings of the Parallel.For/Parallel.ForEach methods. They are a quite complex piece of software!Kresic

© 2022 - 2024 — McMap. All rights reserved.