I built a service to support multiple queue subscriptions in Azure Service Bus, but I'm getting some odd behavior.
My subscription singleton class has a method that looks like this:
public void Subscribe<TMessage>(Func<TMessage, Task> execution, int maxDop = 1, int ttl = 60) where TMessage : IServiceBusMessage
{
try
{
var messageLifespan = TimeSpan.FromSeconds(ttl);
var messageType = typeof(TMessage);
if (!_activeSubscriptionClients.TryGetValue(messageType, out var subscriptionClient))
{
subscriptionClient = _subscriptionClientFactory.Create(typeof(TMessage)).GetAwaiter().GetResult();
if (subscriptionClient.OperationTimeout < messageLifespan) subscriptionClient.OperationTimeout = messageLifespan;
if (subscriptionClient.ServiceBusConnection.OperationTimeout < messageLifespan)
subscriptionClient.ServiceBusConnection.OperationTimeout = messageLifespan;
_activeSubscriptionClients.AddOrUpdate(messageType, subscriptionClient, (key, value) => value);
}
var messageHandlerOptions = new MessageHandlerOptions(OnException)
{
MaxConcurrentCalls = maxDop,
AutoComplete = false,
MaxAutoRenewDuration = messageLifespan,
};
subscriptionClient.RegisterMessageHandler(
async (azureMessage, cancellationToken) =>
{
try
{
var textPayload = _encoding.GetString(azureMessage.Body);
var message = JsonConvert.DeserializeObject<TMessage>(textPayload);
if (message == null)
throw new FormatException($"Cannot deserialize the message payload to type '{typeof(TMessage).FullName}'.");
await execution.Invoke(message);
await subscriptionClient.CompleteAsync(azureMessage.SystemProperties.LockToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "ProcessMessagesAsync(Message, CancellationToken)");
await subscriptionClient.AbandonAsync(azureMessage.SystemProperties.LockToken);
}
}
, messageHandlerOptions);
}
catch (Exception ex)
{
_logger.LogError(ex, "Subscribe(Action<TMessage>)");
throw;
}
}
The idea is, you subscribe to Azure Service Bus for a specific type of message, and that directly corresponds to a queue. In your subscription, you pass in a delegate for how to process the message.
This seems to work... with one caveat.
Regardless of what I set the ttl
for the MaxAutoRenewDuration
, or the OperationTimeout
, on a long-running process for any given message, after a minute the message is unlocked from the queue and another subscriber picks it up and starts processing it.
My understanding is that is exactly what the MaxAutoRenewDuration
is supposed to prevent... but it doesn't seem to prevent anything.
Can anyone tell me what I need to do differently to make sure the consumer owns the message through to completion?
LockDuration
specifically)? Did you try it with another entity? Premium or standard namespace? And the version of the client you're using. – GiLockDuration
set to something like 60 seconds,MaxAutoRenewDuration
set to 2 minutes, I was able to process messages in 60+ seconds and couldn't reproduce the issue you're experiencing. If you'd like to share your project on GitHub, could have a look. Suspect it's the entity that is causing the issue. – GiLockDuration
... are you referring to the Azure queue itself? That's set by default to 60 seconds. – KaputLockDuration
identical to what you have. Suspect it's the subscription issue. Have you tried to delete and recreate it? – GiMaxAutoRenewDuration
, it is suppose to renew the lock duration so other processes won't start processing it? from doc: The maximum duration during which locks are automatically renewed - what does it mean? – Christa