Tuesday, September 09, 2014

Completing tasks dispatched to worker threads on the dispatcher thread

The context:

We have a queue into which we push work items from multiple processes/threads.

We also have a work dispatching thread that reads from the queue in batches (yes, we're talking Azure Queues) and dispatches the work to a thread pool using Task.Run(). We keep a count of active tasks and limit the number of messages we dequeue from the queue not to exceed a sensible pressure on the thread pool (yes, we have more than one process running the dispatcher).

Now, what I want is to run a small piece of task completion code when each tasks finishes execution but I want it on the dispatcher thread (I'm not sure I really want this but let's assume I do). Here is what my code looks like (simplified):



while(queue.TryRead(batchSize, out entries))
{
    foreach(var entry in entries)
    {
        Task.Run(()=>ProcessEntry(entry))
            .ContinueWith(()=>
            {
                CompleteEntry(entry);  // I want this on the same thread as queue.TryRead above
            });
    }
}




If this was a WinForms or WPF application I could capture WindowsFormsSynchronizationContext or DispatcherSynchronizationContext when scheduling my continuations which would do what I want but this is a WorkerRole I'm running this in. I think it should be possible to write a custom scheduler that marshals the continuations to the dispatcher thread assuming the thread would proactively participate in the process.

As a proof of concept we've written this:



class MyScheduler : TaskScheduler
{
    private readonly ConcurrentQueue tasks = new ConcurrentQueue();

    protected override void QueueTask(Task task)
    {
        this.tasks.Enqueue(task);
    }

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        return false;
    }

    protected override IEnumerable GetScheduledTasks()
    {
        return this.tasks;
    }

    public void ProcessPendingTasks()
    {
        Task task;
        while (tasks.TryDequeue(out task))
        {
            base.TryExecuteTask(task);
        }
    }
}




and modified the code to schedule the continuations on this custom scheduler:



var myScheduler = new MyScheduler();

while(queue.TryRead(batchSize, out entries))
{
    foreach(var entry in entries)
    {
        Task.Run(()=>ProcessEntry(entry))
            .ContinueWith((t)=>
            {
                CompleteEntry(entry); 

            }, myScheduler);   // Schedule the continuation on my custom scheduler
    }

    myScheduler.ProcessPendingTasks();   // This is where we actually run the continuations
}



With a bit of error handling code this seems to be working correctly but we have not tested it with any serious load or for long enough to say if this is a good idea.