Asynchronously posting to Azure Queues

2019-05-01 09:27发布

问题:

I try to enqueue messages in Azure Queues asynchronously like this:

private async Task EnqueueItemsAsync(IEnumerable<string> messages) {
            var tasks = messages.Select(msg => _queue.AddMessageAsync(new CloudQueueMessage(msg),
                null, null, null, null));

            await Task.WhenAll(tasks);
        }

If I get it right this says "start enqueuing one item after the other without waiting them to get posted, keep a reference for each task and then wait until all get posted".

This code works fine in most cases, but for a large number of items (5000), it starts enqueuing and then throws a timeout exception (after having enqueued ~3500 items).

I solved it by waiting each one to finish before continuing with the next one

private async Task EnqueueItemsAsync(IEnumerable<string> messages) {
            foreach (var message in messages) {
                await _queue.AddMessageAsync(new CloudQueueMessage(message), null, null, null, null);
            }
        }

Can anyone explain why this happened?

Exception:

System.AggregateException which wraps many such exceptions: Microsoft.WindowsAzure.Storage.Core.Util.AsyncExtensions.<>c__DisplayClass4.<CreateCallbackVoid>b__3(IAsyncResult ar) Request Information RequestID: RequestDate: StatusMessage: <--- ---> (Inner Exception #1) Microsoft.WindowsAzure.Storage.StorageException: The client could not finish the operation within specified timeout. ---> System.TimeoutException: The client could not finish the operation within specified timeout. --- End of inner exception stack trace --- Microsoft.WindowsAzure.Storage.Core.Executor.Executor.EndExecuteAsync[T](IAsyncResult result)`.

回答1:

A queue in Azure is designed have a throughput of 2000 messages per second.

See: Azure Storage Scalability and Performance Targets

When your application reaches the limit of what a partition can handle for your workload, Azure Storage will begin to return error code 503 (Server Busy) or error code 500 (Operation Timeout) responses. When this occurs, the application should use an exponential backoff policy for retries. The exponential backoff allows the load on the partition to decrease, and to ease out spikes in traffic to that partition.



回答2:

It seems that you can make a more robust mechanism by passing a QueryRequestOptions to AddMessageAsync.

Before the query is sent, the request message adds these properties to the command.

I would try passing QueryRequestOptions and setting a value to MaximumExecutionTime and ServerTimeout with a larger value.

This is how the request is filled prior to being sent:

// Microsoft.WindowsAzure.Storage.Queue.QueueRequestOptions
internal void ApplyToStorageCommand<T>(RESTCommand<T> cmd)
{
    if (this.LocationMode.HasValue)
    {
        cmd.LocationMode = this.LocationMode.Value;
    }
    if (this.ServerTimeout.HasValue)
    {
        cmd.ServerTimeoutInSeconds = new int?((int)this.ServerTimeout.Value.TotalSeconds);
    }
    if (this.OperationExpiryTime.HasValue)
    {
        cmd.OperationExpiryTime = this.OperationExpiryTime;
        return;
    }
    if (this.MaximumExecutionTime.HasValue)
    {
        cmd.OperationExpiryTime = new DateTime?(DateTime.Now + this.MaximumExecutionTime.Value);
    }
}

And this is how it's sent:

rESTCommand.PreProcessResponse = delegate(RESTCommand<NullType> cmd, HttpWebResponse resp, Exception ex, OperationContext ctx)
{
    HttpResponseParsers.ProcessExpectedStatusCodeNoException<NullType>(HttpStatusCode.Created, resp, NullType.Value, cmd, ex);
    return NullType.Value;
};