Tuesday, January 4, 2011

Customising the Task.Factory Scheduler

I think it is important to be consistent with your design approaches to problems if at all possible. But it would be naive to think that one approach or tool will be best for any problem.  I had a threading issue to solve recently and I have been making heavy use of the new .NET 4 Task Factory in the TPL library.  The problem was that there is one part of the system that must be single threaded and process incoming events in the order they were received.  I was fully prepared to write a message queue or pump (or find one).

I was delighted to find that the Task Factory has the ability to use a custom scheduler.  The scheduler controls when tasks begin and how many threads are available to process incoming tasks.  Perfect. I just needed a single threaded implementation that processes in the same order as they are queued.

Even better, the MSDN documentation includes an article showing you how to do just this.  The custom scheduler takes a constructor argument for number of threads to use in the pool. So in future if there is a change that allows the inclusion of more threads, this will be easy to change.  Also, if I find a better way, I can revert back to the default task factory and its of the same type as the throttled factory, so that would be an easy change too.

Seeing as the Task Factory used by my problem class is the same type as the default factory (just another instance, with a custom scheduler internal field), I can configure StructureMap to deal out the appropriate factory. Pretty cool stuff.

Here's my final code for my scheduler, which is pretty much the same as the MSDN article above, with style changed etc.


using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;

    /// <summary>
    /// Provides a task scheduler that ensures a maximum concurrency level while
    /// running on top of the ThreadPool.
    /// Source: http://msdn.microsoft.com/en-us/library/ee789351.aspx
    /// Author: Microsoft 2009
    /// </summary>
    public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
    {
        /// <summary>
        /// The maximum concurrency level allowed by this scheduler.
        /// </summary>
        private readonly int maxDegreeOfParallelism;

        /// <summary>
        /// The list of tasks to be executed.
        /// </summary>
        private readonly LinkedList<Task> tasks = new LinkedList<Task>();

        /// <summary>
        /// Whether the current thread is processing work items.
        /// </summary>
        [ThreadStatic]
        private static bool currentThreadIsProcessingItems;

        /// <summary>
        /// Whether the scheduler is currently processing work items.
        /// </summary>
        private int delegatesQueuedOrRunning;

        /// <summary>
        /// Initializes a new instance of the LimitedConcurrencyLevelTaskScheduler class with the
        /// specified degree of parallelism.
        /// </summary>
        /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>
        public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
        {
            this.maxDegreeOfParallelism = maxDegreeOfParallelism;

            if (this.maxDegreeOfParallelism < 1)
            {
                throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
            }
        }

        /// <summary>
        /// Gets the maximum concurrency level supported by this scheduler.
        /// </summary>
        public sealed override int MaximumConcurrencyLevel
        {
            get { return this.maxDegreeOfParallelism; }
        }

        /// <summary>
        /// Queues a task to the scheduler.
        /// </summary>
        /// <param name="task">The task to be queued.</param>
        protected sealed override void QueueTask(Task task)
        {
            // Add the task to the list of tasks to be processed.  If there aren't enough
            // delegates currently queued or running to process tasks, schedule another.
            lock (this.tasks)
            {
                this.tasks.AddLast(task);
                if (this.delegatesQueuedOrRunning < this.maxDegreeOfParallelism)
                {
                    ++this.delegatesQueuedOrRunning;
                    NotifyThreadPoolOfPendingWork();
                }
            }
        }

        /// <summary>
        /// Attempts to execute the specified task on the current thread.
        /// </summary>
        /// <param name="task">The task to be executed.</param>
        /// <param name="taskWasPreviouslyQueued">A Boolean denoting whether or not task has previously been queued. If this parameter is True, then the task may have been previously queued (scheduled); if False, then the task is known not to have been queued, and this call is being made in order to execute the task inline without queuing it.</param>
        /// <returns>
        /// Whether the task could be executed on the current thread.
        /// </returns>
        /// <exception cref="T:System.ArgumentNullException">The <paramref name="task"/> argument is null.</exception>
        /// <exception cref="T:System.InvalidOperationException">The <paramref name="task"/> was already executed.</exception>
        protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            // If this thread isn't already processing a task, we don't support inlining
            if (!currentThreadIsProcessingItems)
            {
                return false;
            }

            // If the task was previously queued, remove it from the queue
            if (taskWasPreviouslyQueued)
            {
                TryDequeue(task);
            }

            // Try to run the task.
            return TryExecuteTask(task);
        }

        /// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary>
        /// <param name="task">The task to be removed.</param>
        /// <returns>Whether the task could be found and removed.</returns>
        protected sealed override bool TryDequeue(Task task)
        {
            lock (this.tasks) return this.tasks.Remove(task);
        }

        /// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary>
        /// <returns>An enumerable of the tasks currently scheduled.</returns>
        protected sealed override IEnumerable<Task> GetScheduledTasks()
        {
            var lockTaken = false;
            try
            {
                Monitor.TryEnter(this.tasks, ref lockTaken);
                if (lockTaken)
                {
                    return this.tasks.ToArray();
                } else
                {
                    throw new NotSupportedException();
                }
            } finally
            {
                if (lockTaken)
                {
                    Monitor.Exit(this.tasks);
                }
            }
        }

        /// <summary>
        /// Informs the ThreadPool that there's work to be executed for this scheduler.
        /// </summary>
        private void NotifyThreadPoolOfPendingWork()
        {
            ThreadPool.UnsafeQueueUserWorkItem(
                _ =>
                {
                    // Note that the current thread is now processing work items.
                    // This is necessary to enable inlining of tasks into this thread.
                    currentThreadIsProcessingItems = true;
                    try
                    {
                        // Process all available items in the queue.
                        while (true)
                        {
                            Task item;
                            lock (this.tasks)
                            {
                                // When there are no more items to be processed,
                                // note that we're done processing, and get out.
                                if (this.tasks.Count == 0)
                                {
                                    --this.delegatesQueuedOrRunning;
                                    break;
                                }

                                // Get the next item from the queue
                                item = this.tasks.First.Value;
                                this.tasks.RemoveFirst();
                            }

                            // Execute the task we pulled out of the queue
                            TryExecuteTask(item);
                        }
                    } finally
                    {
                        // We're done processing items on the current thread
                        currentThreadIsProcessingItems = false;
                    }
                },
            null);
        }
    }


/// <summary>
    /// Provides a task scheduler that ensures only one task is executing at a time, and that tasks
    /// execute in the order that they were queued.
    /// Source: http://msdn.microsoft.com/en-us/library/ee789351.aspx
    /// Author: Microsoft 2009
    /// </summary>
    public sealed class OrderedTaskScheduler : LimitedConcurrencyLevelTaskScheduler
    {
        /// <summary>
        /// Initializes a new instance of the <see cref="OrderedTaskScheduler"/> class. Initializes an instance of the OrderedTaskScheduler class.
        /// </summary>
        public OrderedTaskScheduler() : base(1)
        {
        }
    }



Finally I wrote a test program to prove two things. First, that using the default Task Factory it _might_ complete tasks in a different order than they were queued into the task factory.  And second, that by running with limited concurrency, ie 1 thread, this will guarantee the tasks are completed in the order they were queued.

Here's my test console program:

namespace TaskInGauranteedOrder
{
    using System;
    using System.Collections.Concurrent;
    using System.Diagnostics;
    using System.Threading;
    using System.Threading.Tasks;

    public static class Program
    {
        public static void Main()
        {
            // Test the Schedule to ensure its doing its job correctly
            SchedulerCheck(5);

            // Test Default Factory
            SequenceCheck(null, 500);

            // Test 1 Thread Factory
            SequenceCheck(new LimitedConcurrencyLevelTaskScheduler(1), 1000);
        }

        private static void SchedulerCheck(int maxThreads)
        {
            Console.WriteLine("**** Scheduler Check " + maxThreads + " threads maximum ****");
            var timer = Stopwatch.StartNew();
            var limitedScheduler = new LimitedConcurrencyLevelTaskScheduler(maxThreads);
            var factory = new TaskFactory(limitedScheduler);
            var bag = new ConcurrentDictionary<int, int>();

            var tasks = new Task[1500];
            for (int i = 0; i < 1500; i++)
            {
                tasks[i] = factory.StartNew(() =>
                {
                    int threadid = Thread.CurrentThread.ManagedThreadId;

                    // Console.WriteLine("{0} on thread {1}", i, threadid);
                    if (!bag.ContainsKey(threadid))
                    {
                        bag.TryAdd(threadid, threadid);
                    }
                });
            }
            
            Task.WaitAll(tasks);

            if (bag.Count > maxThreads)
            {
                Console.WriteLine(timer.ElapsedMilliseconds.ToString("#,000") + "ms. Scheduler Check FAILED.");
            } else if (bag.Count < maxThreads)
            {
                Console.WriteLine(timer.ElapsedMilliseconds.ToString("#,000") + "ms. Scheduler Check Warning: Fewer threads used that max specified " + bag.Count + "/" + maxThreads);
            } else
            {
                Console.WriteLine(timer.ElapsedMilliseconds.ToString("#,000") + "ms. Ok.");
            }

            Console.WriteLine();
        }

        private static void SequenceCheck(TaskScheduler scheduler, int iterations)
        {
            var timer = Stopwatch.StartNew();
            TaskFactory factory;
            if (scheduler == null)
            {
                factory = Task.Factory;
                Console.WriteLine("**** Sequence Check ...DEFAULT FACTORY... ****");
            } else
            {
                factory = new TaskFactory(scheduler);
                Console.WriteLine("**** Sequence Check... 1 THREAD FACTORY... ****");
            }

            Console.WriteLine(iterations + " iterations: ");
            bool failed = false;

            // Do it a few times to ensure consistency););
            for (int repeat = 0; repeat < iterations; repeat++)
            {
                var taskExecOrder = new ConcurrentQueue<int>();
                var tasks = new Task[10];
                for (int index = 0; index < 10; index++)
                {
                    int index1 = index;
                    tasks[index] = factory.StartNew(() => taskExecOrder.Enqueue(index1));
                }

                Task.WaitAll(tasks);

                int result = 0, last = -1;
                bool outOfOrder = false;
                while (taskExecOrder.TryDequeue(out result))
                {
                    outOfOrder = outOfOrder | (last != result - 1);
                    last = result;
                }

                if (outOfOrder)
                {
                    failed = true;
                } 

                if (repeat % 3 == 0)
                {
                    Console.Write(".");
                }
            }

            timer.Stop();
            Console.WriteLine();
            if (failed)
            {
                Console.WriteLine(timer.ElapsedMilliseconds.ToString("#,000") + "ms. FAILED! Out of Sequence Order.");
            } else
            {
                Console.WriteLine(timer.ElapsedMilliseconds.ToString("#,000") + "ms. Ok.");
            }

            Console.WriteLine();
        }
    }
}


And the output:
Using the default factory you can see that one or more tasks are completed out of order. Hence the "Failed" text. There are 500 tasks run, so there's a good chance it will complete some out of order if its possible to do so. Lastly using the Limited Concurrency Scheduler with 1 thread, the results are in an expected order.

Friday, December 24, 2010

Balloon Pop-ups and Toaster Pops

Recently I was looking into balloons, toaster-pops, and general system tray messages.  There's nothing much WPF gives you out of the box to give you a leg up. So I turned to Google to search for either code samples or control libraries.  I found a fantastic free open-source all-in-one framework for all things balloons, system-tray and toaster-pops.

Check it out here:
http://www.hardcodet.net/projects/wpf-notifyicon



So it looks cool, but how hard is it to make a quick and dirty sample application that shows a custom balloon pop-up out of the system tray? (and yes it follows the tray if you move your task bar).


  1. Create a new WPF project and reference the one Hardcodet.Wpf.TaskbarNotification DLL.
  2. Add this code to the MainWindow.Xaml
    <TextBlock Text="Wait 5 seconds for the ring balloon popup to appear." />
    <tb:TaskbarIcon x:Name="tb" VerticalAlignment="Top" Visibility="Hidden" />
    
    
  3. Add this to the code behind:
    private DispatcherTimer timer;
    
            public MainWindow()
            {
                InitializeComponent();
                Loaded += OnLoaded;
            }
    
            private void OnLoaded(object sender, System.Windows.RoutedEventArgs e)
            {
                this.timer = new DispatcherTimer(new TimeSpan(0, 0, 6), DispatcherPriority.Normal, OnTimerTick, Dispatcher);
                this.timer.Start();
            }
    
            private void OnTimerTick(object sender, EventArgs e)
            {
                var balloon = new FancyBalloon { BalloonText = "Ring Ring" };
                tb.ShowCustomBalloon(balloon, PopupAnimation.Scroll, 3000);
            }
  4. Add a user control to define what you want your custom pop-up to look like.
    <UserControl x:Class="WpfBasicBalloon.FancyBalloon"
                 xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
                 xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
                 xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006" 
                 xmlns:d="http://schemas.microsoft.com/expression/blend/2008" 
                 mc:Ignorable="d" 
                 Height="300" Width="300">
        <Grid>
            <Border 
                Opacity="0.5"
                Background="Pink" 
                BorderBrush="Red"
                BorderThickness="2"
                CornerRadius="10"
                Margin="10">
                <Border.BitmapEffect>
                    <DropShadowBitmapEffect />
                </Border.BitmapEffect>
                <StackPanel Margin="10">
                <TextBlock Text="Hello World - in pink just for Jo. :-)" />
                <TextBlock Text="{Binding BalloonText}" />
            </StackPanel>
            </Border>
        </Grid>
    </UserControl>

Easy.


Its so little code, given a few days of my cat walking randomly across my keyboard, there's a good chance he will come up with this code on his own. Better chances of winning the lottery over the holiday break any way!

Happy holidays.

Wednesday, December 22, 2010

Extracting A ControlTemplate From An Existing WPF Control

There are a number of tools to do this, not least of which is Blend.  But sometimes you need some code to do it for you.  There have been a few instances where Blend was unable to extract the control template, and resorting to code is the last line of defense.

Here's a snippet of code from a test application where Group1 is an element (of type RibbonGroup).  This code comes from a code behind from a xaml window.

var template = this.Group1.Template;
            var xmlSettings = new XmlWriterSettings { Indent = true };

            var builder = new StringBuilder();
            XmlWriter writer = XmlWriter.Create(builder, xmlSettings);
            XamlWriter.Save(template, writer);
            Clipboard.SetText(builder.ToString());
            Debug.WriteLine(builder.ToString());
This will output the control template xaml to the Debug output and also copy it to the clipboard.

Tuesday, December 21, 2010

Grrrr TFS Crashed Build Locked Files

Today I had to fix a build issue, which are few and far between these days with TFS 2010.  By and large I love it, it has a huge array of features and advancements over previous version. And sure beats the hell out of Subversion for features and ease of use.

The build crashed after someone managed to checked in the wrong version of / or corrupt DLLs. The result was files locked by the build machine that could not be rolled back or checked out.

When I attempted to check out the files to roll them back obviously being locked I got the error message below..."is locked for check-out by..."
This problem cannot be solved by using the IDE. Only the command line can fix this.  The good news is its totally doable and the above error gives you the workspace name you need.

Jump into a Visual Studio 2010 Command Prompt window and type the following:

TF undo /workspace:<WorkspaceNameGivenInOutput> "<FullPathToFileToUndo>" /collection:<FullHttpUrlIncludingPortNumberAndCollectionProjectName>