I built a service to support multiple queue subscriptions in Azure Service Bus, but I'm getting some odd behavior.
My subscription singleton class has a method that looks like this:
public void Subscribe<TMessage>(Func<TMessage, Task> execution, int maxDop = 1, int ttl = 60) where TMessage : IServiceBusMessage
{
try
{
var messageLifespan = TimeSpan.FromSeconds(ttl);
var messageType = typeof(TMessage);
if (!_activeSubscriptionClients.TryGetValue(messageType, out var subscriptionClient))
{
subscriptionClient = _subscriptionClientFactory.Create(typeof(TMessage)).GetAwaiter().GetResult();
if (subscriptionClient.OperationTimeout < messageLifespan) subscriptionClient.OperationTimeout = messageLifespan;
if (subscriptionClient.ServiceBusConnection.OperationTimeout < messageLifespan)
subscriptionClient.ServiceBusConnection.OperationTimeout = messageLifespan;
_activeSubscriptionClients.AddOrUpdate(messageType, subscriptionClient, (key, value) => value);
}
var messageHandlerOptions = new MessageHandlerOptions(OnException)
{
MaxConcurrentCalls = maxDop,
AutoComplete = false,
MaxAutoRenewDuration = messageLifespan,
};
subscriptionClient.RegisterMessageHandler(
async (azureMessage, cancellationToken) =>
{
try
{
var textPayload = _encoding.GetString(azureMessage.Body);
var message = JsonConvert.DeserializeObject<TMessage>(textPayload);
if (message == null)
throw new FormatException($"Cannot deserialize the message payload to type '{typeof(TMessage).FullName}'.");
await execution.Invoke(message);
await subscriptionClient.CompleteAsync(azureMessage.SystemProperties.LockToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "ProcessMessagesAsync(Message, CancellationToken)");
await subscriptionClient.AbandonAsync(azureMessage.SystemProperties.LockToken);
}
}
, messageHandlerOptions);
}
catch (Exception ex)
{
_logger.LogError(ex, "Subscribe(Action<TMessage>)");
throw;
}
}
The idea is, you subscribe to Azure Service Bus for a specific type of message, and that directly corresponds to a queue. In your subscription, you pass in a delegate for how to process the message.
This seems to work... with one caveat.
Regardless of what I set the ttl
for the MaxAutoRenewDuration
, or the OperationTimeout
, on a long-running process for any given message, after a minute the message is unlocked from the queue and another subscriber picks it up and starts processing it.
My understanding is that is exactly what the MaxAutoRenewDuration
is supposed to prevent... but it doesn't seem to prevent anything.
Can anyone tell me what I need to do differently to make sure the consumer owns the message through to completion?
It turns out the remote process that the consumer was running was failing silently and not returning a failure status code (or anything else); the auto-refresh mechanism hung waiting for the result, so the message ended up timing out.
I'm not clear on how to prevent that, but once I fixed the issue on the remote process, the problem was no longer reproducible.
Moral of the story: If everything looks right, and it's still timing out, it seems the autorefresh mechanism shares some resources with asynchronous operations you are waiting on. It may be another place to look for failures.
there are a few options I can think of that you might want to look at.
Instead of using the default
ReceiveMode = PeekLock
in the SubscriptionClient, set it to ReceiveAndDelete so once a message is consumed, it will be removed from the queue and won't be consumed by any other clients, this does mean that you have to handle exception gracefully and perform retry yourself;Have a look at
OperationTimeout
which according to the doco it is forDuration after which individual operations will timeout