I want to implement a prioritised ActionBlock<T>
. So that i can Conditionally give priority to some TInput
items by using a Predicate<T>
.
I read Parallel Extensions Extras Samples and Guide to Implementing Custom TPL Dataflow Blocks.
But Still don`t figure out how can i implement this scenario.
---------------------------- EDIT ---------------------------
There are some tasks, which 5 of them can be run simultaneously. When user push the button, some (depends on predicate function) tasks should run with the most priority.
In fact i write this code
TaskScheduler taskSchedulerHighPriority;
ActionBlock<CustomObject> actionBlockLow;
ActionBlock<CustomObject> actionBlockHigh;
...
queuedTaskScheduler = new QueuedTaskScheduler(TaskScheduler.Default, 5);
taskSchedulerHigh = queuedTaskScheduler.ActivateNewQueue(0);
taskSchedulerLow = queuedTaskScheduler.ActivateNewQueue(1);
...
actionBlockHigh = new ActionBlock<CustomObject>(new Action<CustomObject>(method), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, SingleProducerConstrained = false, TaskScheduler = taskSchedulerHigh });
actionBlockLow = new ActionBlock<CustomObject>(new Action<CustomObject>(method), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, MaxMessagesPerTask = 1, TaskScheduler = taskSchedulerLow });
...
if (predicate(customObject))
actionBlockHigh.Post(customObject);
else
actionBlockLow.Post(customObject);
But it seems priority does not take effected at all.
---------------------------- EDIT ------------------
I find the fact that when i use this line of code:
actionBlockHigh = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerHigh });
actionBlockLow = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerLow });
Cause application observe priorities of the Tasks correctly but only one task can be execute at a time, meanwhile using the first code block that is shown in flowing, cause application run 5 tasks simultaneously but in inappropriate priority order.
actionBlockHigh = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, TaskScheduler = taskSchedulerHigh });
actionBlockLow = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, TaskScheduler = taskSchedulerLow });
Update:
Tanks to svick, i should specify MaxMessagesPerTask
for taskSchedulerLow
.
T
at all? Or is the priority an inherent/derived property ofT
? – Conic