Tuesday, January 4, 2011

Customising the Task.Factory Scheduler

I think it is important to be consistent with your design approaches to problems if at all possible. But it would be naive to think that one approach or tool will be best for any problem.  I had a threading issue to solve recently and I have been making heavy use of the new .NET 4 Task Factory in the TPL library.  The problem was that there is one part of the system that must be single threaded and process incoming events in the order they were received.  I was fully prepared to write a message queue or pump (or find one).

I was delighted to find that the Task Factory has the ability to use a custom scheduler.  The scheduler controls when tasks begin and how many threads are available to process incoming tasks.  Perfect. I just needed a single threaded implementation that processes in the same order as they are queued.

Even better, the MSDN documentation includes an article showing you how to do just this.  The custom scheduler takes a constructor argument for number of threads to use in the pool. So in future if there is a change that allows the inclusion of more threads, this will be easy to change.  Also, if I find a better way, I can revert back to the default task factory and its of the same type as the throttled factory, so that would be an easy change too.

Seeing as the Task Factory used by my problem class is the same type as the default factory (just another instance, with a custom scheduler internal field), I can configure StructureMap to deal out the appropriate factory. Pretty cool stuff.

Here's my final code for my scheduler, which is pretty much the same as the MSDN article above, with style changed etc.


using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;

    /// <summary>
    /// Provides a task scheduler that ensures a maximum concurrency level while
    /// running on top of the ThreadPool.
    /// Source: http://msdn.microsoft.com/en-us/library/ee789351.aspx
    /// Author: Microsoft 2009
    /// </summary>
    public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
    {
        /// <summary>
        /// The maximum concurrency level allowed by this scheduler.
        /// </summary>
        private readonly int maxDegreeOfParallelism;

        /// <summary>
        /// The list of tasks to be executed.
        /// </summary>
        private readonly LinkedList<Task> tasks = new LinkedList<Task>();

        /// <summary>
        /// Whether the current thread is processing work items.
        /// </summary>
        [ThreadStatic]
        private static bool currentThreadIsProcessingItems;

        /// <summary>
        /// Whether the scheduler is currently processing work items.
        /// </summary>
        private int delegatesQueuedOrRunning;

        /// <summary>
        /// Initializes a new instance of the LimitedConcurrencyLevelTaskScheduler class with the
        /// specified degree of parallelism.
        /// </summary>
        /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>
        public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
        {
            this.maxDegreeOfParallelism = maxDegreeOfParallelism;

            if (this.maxDegreeOfParallelism < 1)
            {
                throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
            }
        }

        /// <summary>
        /// Gets the maximum concurrency level supported by this scheduler.
        /// </summary>
        public sealed override int MaximumConcurrencyLevel
        {
            get { return this.maxDegreeOfParallelism; }
        }

        /// <summary>
        /// Queues a task to the scheduler.
        /// </summary>
        /// <param name="task">The task to be queued.</param>
        protected sealed override void QueueTask(Task task)
        {
            // Add the task to the list of tasks to be processed.  If there aren't enough
            // delegates currently queued or running to process tasks, schedule another.
            lock (this.tasks)
            {
                this.tasks.AddLast(task);
                if (this.delegatesQueuedOrRunning < this.maxDegreeOfParallelism)
                {
                    ++this.delegatesQueuedOrRunning;
                    NotifyThreadPoolOfPendingWork();
                }
            }
        }

        /// <summary>
        /// Attempts to execute the specified task on the current thread.
        /// </summary>
        /// <param name="task">The task to be executed.</param>
        /// <param name="taskWasPreviouslyQueued">A Boolean denoting whether or not task has previously been queued. If this parameter is True, then the task may have been previously queued (scheduled); if False, then the task is known not to have been queued, and this call is being made in order to execute the task inline without queuing it.</param>
        /// <returns>
        /// Whether the task could be executed on the current thread.
        /// </returns>
        /// <exception cref="T:System.ArgumentNullException">The <paramref name="task"/> argument is null.</exception>
        /// <exception cref="T:System.InvalidOperationException">The <paramref name="task"/> was already executed.</exception>
        protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            // If this thread isn't already processing a task, we don't support inlining
            if (!currentThreadIsProcessingItems)
            {
                return false;
            }

            // If the task was previously queued, remove it from the queue
            if (taskWasPreviouslyQueued)
            {
                TryDequeue(task);
            }

            // Try to run the task.
            return TryExecuteTask(task);
        }

        /// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary>
        /// <param name="task">The task to be removed.</param>
        /// <returns>Whether the task could be found and removed.</returns>
        protected sealed override bool TryDequeue(Task task)
        {
            lock (this.tasks) return this.tasks.Remove(task);
        }

        /// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary>
        /// <returns>An enumerable of the tasks currently scheduled.</returns>
        protected sealed override IEnumerable<Task> GetScheduledTasks()
        {
            var lockTaken = false;
            try
            {
                Monitor.TryEnter(this.tasks, ref lockTaken);
                if (lockTaken)
                {
                    return this.tasks.ToArray();
                } else
                {
                    throw new NotSupportedException();
                }
            } finally
            {
                if (lockTaken)
                {
                    Monitor.Exit(this.tasks);
                }
            }
        }

        /// <summary>
        /// Informs the ThreadPool that there's work to be executed for this scheduler.
        /// </summary>
        private void NotifyThreadPoolOfPendingWork()
        {
            ThreadPool.UnsafeQueueUserWorkItem(
                _ =>
                {
                    // Note that the current thread is now processing work items.
                    // This is necessary to enable inlining of tasks into this thread.
                    currentThreadIsProcessingItems = true;
                    try
                    {
                        // Process all available items in the queue.
                        while (true)
                        {
                            Task item;
                            lock (this.tasks)
                            {
                                // When there are no more items to be processed,
                                // note that we're done processing, and get out.
                                if (this.tasks.Count == 0)
                                {
                                    --this.delegatesQueuedOrRunning;
                                    break;
                                }

                                // Get the next item from the queue
                                item = this.tasks.First.Value;
                                this.tasks.RemoveFirst();
                            }

                            // Execute the task we pulled out of the queue
                            TryExecuteTask(item);
                        }
                    } finally
                    {
                        // We're done processing items on the current thread
                        currentThreadIsProcessingItems = false;
                    }
                },
            null);
        }
    }


/// <summary>
    /// Provides a task scheduler that ensures only one task is executing at a time, and that tasks
    /// execute in the order that they were queued.
    /// Source: http://msdn.microsoft.com/en-us/library/ee789351.aspx
    /// Author: Microsoft 2009
    /// </summary>
    public sealed class OrderedTaskScheduler : LimitedConcurrencyLevelTaskScheduler
    {
        /// <summary>
        /// Initializes a new instance of the <see cref="OrderedTaskScheduler"/> class. Initializes an instance of the OrderedTaskScheduler class.
        /// </summary>
        public OrderedTaskScheduler() : base(1)
        {
        }
    }



Finally I wrote a test program to prove two things. First, that using the default Task Factory it _might_ complete tasks in a different order than they were queued into the task factory.  And second, that by running with limited concurrency, ie 1 thread, this will guarantee the tasks are completed in the order they were queued.

Here's my test console program:

namespace TaskInGauranteedOrder
{
    using System;
    using System.Collections.Concurrent;
    using System.Diagnostics;
    using System.Threading;
    using System.Threading.Tasks;

    public static class Program
    {
        public static void Main()
        {
            // Test the Schedule to ensure its doing its job correctly
            SchedulerCheck(5);

            // Test Default Factory
            SequenceCheck(null, 500);

            // Test 1 Thread Factory
            SequenceCheck(new LimitedConcurrencyLevelTaskScheduler(1), 1000);
        }

        private static void SchedulerCheck(int maxThreads)
        {
            Console.WriteLine("**** Scheduler Check " + maxThreads + " threads maximum ****");
            var timer = Stopwatch.StartNew();
            var limitedScheduler = new LimitedConcurrencyLevelTaskScheduler(maxThreads);
            var factory = new TaskFactory(limitedScheduler);
            var bag = new ConcurrentDictionary<int, int>();

            var tasks = new Task[1500];
            for (int i = 0; i < 1500; i++)
            {
                tasks[i] = factory.StartNew(() =>
                {
                    int threadid = Thread.CurrentThread.ManagedThreadId;

                    // Console.WriteLine("{0} on thread {1}", i, threadid);
                    if (!bag.ContainsKey(threadid))
                    {
                        bag.TryAdd(threadid, threadid);
                    }
                });
            }
            
            Task.WaitAll(tasks);

            if (bag.Count > maxThreads)
            {
                Console.WriteLine(timer.ElapsedMilliseconds.ToString("#,000") + "ms. Scheduler Check FAILED.");
            } else if (bag.Count < maxThreads)
            {
                Console.WriteLine(timer.ElapsedMilliseconds.ToString("#,000") + "ms. Scheduler Check Warning: Fewer threads used that max specified " + bag.Count + "/" + maxThreads);
            } else
            {
                Console.WriteLine(timer.ElapsedMilliseconds.ToString("#,000") + "ms. Ok.");
            }

            Console.WriteLine();
        }

        private static void SequenceCheck(TaskScheduler scheduler, int iterations)
        {
            var timer = Stopwatch.StartNew();
            TaskFactory factory;
            if (scheduler == null)
            {
                factory = Task.Factory;
                Console.WriteLine("**** Sequence Check ...DEFAULT FACTORY... ****");
            } else
            {
                factory = new TaskFactory(scheduler);
                Console.WriteLine("**** Sequence Check... 1 THREAD FACTORY... ****");
            }

            Console.WriteLine(iterations + " iterations: ");
            bool failed = false;

            // Do it a few times to ensure consistency););
            for (int repeat = 0; repeat < iterations; repeat++)
            {
                var taskExecOrder = new ConcurrentQueue<int>();
                var tasks = new Task[10];
                for (int index = 0; index < 10; index++)
                {
                    int index1 = index;
                    tasks[index] = factory.StartNew(() => taskExecOrder.Enqueue(index1));
                }

                Task.WaitAll(tasks);

                int result = 0, last = -1;
                bool outOfOrder = false;
                while (taskExecOrder.TryDequeue(out result))
                {
                    outOfOrder = outOfOrder | (last != result - 1);
                    last = result;
                }

                if (outOfOrder)
                {
                    failed = true;
                } 

                if (repeat % 3 == 0)
                {
                    Console.Write(".");
                }
            }

            timer.Stop();
            Console.WriteLine();
            if (failed)
            {
                Console.WriteLine(timer.ElapsedMilliseconds.ToString("#,000") + "ms. FAILED! Out of Sequence Order.");
            } else
            {
                Console.WriteLine(timer.ElapsedMilliseconds.ToString("#,000") + "ms. Ok.");
            }

            Console.WriteLine();
        }
    }
}


And the output:
Using the default factory you can see that one or more tasks are completed out of order. Hence the "Failed" text. There are 500 tasks run, so there's a good chance it will complete some out of order if its possible to do so. Lastly using the Limited Concurrency Scheduler with 1 thread, the results are in an expected order.

No comments:

Post a Comment