First, we define a contract that will be implemented by a queue listener component that is hosted by a worker role and listens on a Windows Azure queue.
/// Defines a contract that must be implemented by an extension responsible for listening on a Windows Azure queue.
public interface ICloudQueueServiceWorkerRoleExtension {
/// Starts a multi-threaded queue listener that uses the specified number of dequeue threads.
void StartListener(int threadCount);
Note
/// Returns the current state of the queue listener to determine point-in-time load characteristics.
CloudQueueListenerInfo QueryState();
/// Gets or sets the batch size when performing dequeue operation against a Windows Azure queue.
int DequeueBatchSize { get; set; }
/// Gets or sets the default interval that defines how long a queue listener will be idle for between polling a queue.
TimeSpan DequeueInterval { get; set; }
/// Defines a callback delegate which will be invoked whenever the queue is empty.
event WorkCompletedDelegate QueueEmpty;
}
The QueueEmpty event is intended to be used by a host. It provides the mechanism for the host to control the behavior of the queue listener when the queue is empty. The respective event delegate is defined as follows:
/// <summary>
/// Defines a callback delegate which will be invoked whenever an unit of work has been completed and the worker is
/// requesting further instructions as to next steps.
/// </summary>
/// <param name="sender">The source of the event.</param>
/// <param name="idleCount">The value indicating how many times the worker has been idle.</param>
/// <param name="delay">Time interval during which the worker is instructed to sleep before performing next unit of work.</param>
/// <returns>A flag indicating that the worker should stop processing any further units of work and must terminate.</returns>
public delegate bool WorkCompletedDelegate(object sender, int idleCount, out TimeSpan delay);
Handling queue items is easier if a listener can operate with generics as opposed to using “bare metal” SDK classes such as CloudQueueMessage. Therefore, we define a new interface that will be implemented by a queue listener capable of supporting generics-based access to queues:
/// <summary>
/// Defines a contract that must be supported by an extension that implements a generics-aware queue listener.
/// </summary>
/// <typeparam name="T">The type of queue item data that will be handled by the queue listener.</typeparam>
public interface ICloudQueueListenerExtension<T> : ICloudQueueServiceWorkerRoleExtension, IObservable<T>
{ }
Note that we also enabled the generics-aware listener to push queue items to one or more subscribers through the implementation of the Observer design pattern by leveraging the IObservable<T> interface available in the .NET Framework 4.
We intend to keep a single instance of a component implementing the
ICloudQueueListenerExtension<T> interface. However, we need to be able to run multiple dequeue threads (worker processes, or tasks for simplicity). Therefore, we add support for multi-threaded dequeue logic in the queue listener component. This is where we take advantage of the Task Parallel Library (TPL). The StartListener method will be responsible for spinning up the specified number of dequeue threads as follows:
/// <summary>
/// Starts the specified number of dequeue tasks.
/// </summary>
/// <param name="threadCount">The number of dequeue tasks.</param>
public void StartListener(int threadCount) {
Guard.ArgumentNotZeroOrNegativeValue(threadCount, "threadCount");
// The collection of dequeue tasks needs to be reset on each call to this method.
if (this.dequeueTasks.IsAddingCompleted) {
this.dequeueTasks = new BlockingCollection<Task>(this.dequeueTaskList);
}
for (int i = 0; i < threadCount; i++) {
CancellationToken cancellationToken = this.cancellationSignal.Token;
CloudQueueListenerDequeueTaskState<T> workerState = new
CloudQueueListenerDequeueTaskState<T>(Subscriptions, cancellationToken, this.queueLocation, this.queueStorage);
// Start a new dequeue task and register it in the collection of tasks internally managed by this component.
this.dequeueTasks.Add(Task.Factory.StartNew(DequeueTaskMain, workerState, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default));
}
this.dequeueTasks.CompleteAdding();
}
The DequeueTaskMain method implements the functional body of a dequeue thread. Its main operations are the following:
/// <summary>
/// Implements a task performing dequeue operations against a given Windows Azure queue.
/// </summary>
/// <param name="state">An object containing data to be used by the task.</param>
private void DequeueTaskMain(object state) {
CloudQueueListenerDequeueTaskState<T> workerState = (CloudQueueListenerDequeueTaskState<T>)state;
int idleStateCount = 0;
TimeSpan sleepInterval = DequeueInterval;
try {
// Run a dequeue task until asked to terminate or until a break condition is encountered.
while (workerState.CanRun) {
try {
var queueMessages = from msg in
workerState.QueueStorage.Get<T>(workerState.QueueLocation.QueueName, DequeueBatchSize, workerState.QueueLocation.VisibilityTimeout).AsParallel() where msg != null select msg;
int messageCount = 0;
// Process the dequeued messages concurrently by taking advantage of the above PLINQ query.
queueMessages.ForAll((message) =>
{
// Reset the count of idle iterations.
idleStateCount = 0;
// Notify all subscribers that a new message requires processing.
workerState.OnNext(message);
// Once successful, remove the processed message from the queue.
workerState.QueueStorage.Delete<T>(message);
// Increment the number of processed messages.
messageCount++;
});
// Check whether or not we have done any work during this iteration.
if (0 == messageCount) {
// Increment the number of iterations when we were not doing any work (e.g. no messages were dequeued).
idleStateCount++;
// Call the user-defined delegate informing that no more work is available.
if (QueueEmpty != null) {
// Check if the user-defined delegate has requested a halt to any further work processing.
if (QueueEmpty(this, idleStateCount, out sleepInterval)) {
// Terminate the dequeue loop if user-defined delegate advised us to do so.
break;
} }
// Enter the idle state for the defined interval.
Thread.Sleep(sleepInterval);
} }
catch (Exception ex) {
if (ex is OperationCanceledException) {
throw;
} else {
// Offload the responsibility for handling or reporting the error to the external object.
workerState.OnError(ex);
Thread.Sleep(sleepInterval);
} } } } finally {
workerState.OnCompleted();
} }
A couple of points are worth making with respect to the DequeueTaskMain method implementation.
First, we are taking advantage of the Parallel LINQ (PLINQ) when dispatching messages for processing. The main advantage of PLINQ here is to speed up message handling by executing the query delegate on separate worker threads on multiple processors in parallel whenever possible.
Since query parallelization is internally managed by PLINQ, there is no guarantee that PLINQ will utilize more than a single core for work parallelization. PLINQ may run a query sequentially if it determines that the overhead of parallelization will slow down the query.
In order to benefit from PLINQ, the total work in the query has to be sufficiently large to benefit from the overhead of scheduling the work on the thread pool.
Second, we are not fetching a single message at a time. Instead, we ask the Queue Service API to retrieve a specific number of messages from a queue. This is driven by the DequeueBatchSize parameter that is passed to the Get<T> method. When we enter the storage abstraction layer implemented as part of the overall solution, this parameter is handed over to the Queue Service API method. In addition, we run a safety check to ensure that the batch size doesn’t exceed the maximum size supported by the APIs. This is implemented as follows:
/// This class provides reliable generics-aware access to the Windows Azure Queue storage.
public sealed class ReliableCloudQueueStorage : ICloudQueueStorage {
/// The maximum batch size supported by Queue Service API in a single Get operation.
private const int MaxDequeueMessageCount = 32;
/// Gets a collection of messages from the specified queue and applies the specified visibility timeout.
public IEnumerable<T> Get<T>(string queueName, int count, TimeSpan visibilityTimeout) {
Guard.ArgumentNotNullOrEmptyString(queueName, "queueName");
Guard.ArgumentNotZeroOrNegativeValue(count, "count");
Note
try {
var queue =
this.queueStorage.GetQueueReference(CloudUtility.GetSafeContainerName(queueName));
IEnumerable<CloudQueueMessage> queueMessages =
this.retryPolicy.ExecuteAction<IEnumerable<CloudQueueMessage>>(() =>
{
return queue.GetMessages(Math.Min(count, MaxDequeueMessageCount), visibilityTimeout);
});
// ... There is more code after this point ...
And finally, we are not going to run the dequeue task indefinitely. We provisioned an explicit checkpoint implemented as a QueueEmpty event which is raised whenever a queue becomes empty. At that point, we consult to a QueueEmpty event handler to determine whether or not it permits us to finish the running dequeue task. A well-designed implementation of the
QueueEmpty event handler allows supporting the “auto scale-down” capability as explained in the following section.