We have a queue into which we push work items from multiple processes/threads.
We also have a work dispatching thread that reads from the queue in batches (yes, we're talking Azure Queues) and dispatches the work to a thread pool using Task.Run(). We keep a count of active tasks and limit the number of messages we dequeue from the queue not to exceed a sensible pressure on the thread pool (yes, we have more than one process running the dispatcher).
Now, what I want is to run a small piece of task completion code when each tasks finishes execution but I want it on the dispatcher thread (I'm not sure I really want this but let's assume I do). Here is what my code looks like (simplified):
while(queue.TryRead(batchSize, out entries))
{
foreach(var entry in entries)
{
Task.Run(()=>ProcessEntry(entry))
.ContinueWith(()=>
{
CompleteEntry(entry); // I want this on the same thread as queue.TryRead above
});
}
}
If this was a WinForms or WPF application I could capture WindowsFormsSynchronizationContext or DispatcherSynchronizationContext when scheduling my continuations which would do what I want but this is a WorkerRole I'm running this in. I think it should be possible to write a custom scheduler that marshals the continuations to the dispatcher thread assuming the thread would proactively participate in the process.
As a proof of concept we've written this:
class MyScheduler : TaskScheduler
{
private readonly ConcurrentQueue tasks = new ConcurrentQueue();
protected override void QueueTask(Task task)
{
this.tasks.Enqueue(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return false;
}
protected override IEnumerable GetScheduledTasks()
{
return this.tasks;
}
public void ProcessPendingTasks()
{
Task task;
while (tasks.TryDequeue(out task))
{
base.TryExecuteTask(task);
}
}
}
and modified the code to schedule the continuations on this custom scheduler:
var myScheduler = new MyScheduler();
while(queue.TryRead(batchSize, out entries))
{
foreach(var entry in entries)
{
Task.Run(()=>ProcessEntry(entry))
.ContinueWith((t)=>
{
CompleteEntry(entry);
}, myScheduler); // Schedule the continuation on my custom scheduler
}
myScheduler.ProcessPendingTasks(); // This is where we actually run the continuations
}
With a bit of error handling code this seems to be working correctly but we have not tested it with any serious load or for long enough to say if this is a good idea.
No comments:
Post a Comment