Azure Service Bus message lock not being renewed?
K

2

3

I built a service to support multiple queue subscriptions in Azure Service Bus, but I'm getting some odd behavior.

My subscription singleton class has a method that looks like this:

    public void Subscribe<TMessage>(Func<TMessage, Task> execution, int maxDop = 1, int ttl = 60) where TMessage : IServiceBusMessage
    {
        try
        {
            var messageLifespan = TimeSpan.FromSeconds(ttl);
            var messageType = typeof(TMessage);
            if (!_activeSubscriptionClients.TryGetValue(messageType, out var subscriptionClient))
            {
                subscriptionClient = _subscriptionClientFactory.Create(typeof(TMessage)).GetAwaiter().GetResult();
                if (subscriptionClient.OperationTimeout < messageLifespan) subscriptionClient.OperationTimeout = messageLifespan;
                if (subscriptionClient.ServiceBusConnection.OperationTimeout < messageLifespan)
                    subscriptionClient.ServiceBusConnection.OperationTimeout = messageLifespan;
                _activeSubscriptionClients.AddOrUpdate(messageType, subscriptionClient, (key, value) => value);
            }

            var messageHandlerOptions = new MessageHandlerOptions(OnException)
            {
                MaxConcurrentCalls = maxDop,
                AutoComplete = false,
                MaxAutoRenewDuration = messageLifespan,
            };


            subscriptionClient.RegisterMessageHandler(
                async (azureMessage, cancellationToken) =>
                {
                    try
                    {
                        var textPayload = _encoding.GetString(azureMessage.Body);
                        var message = JsonConvert.DeserializeObject<TMessage>(textPayload);
                        if (message == null)
                            throw new FormatException($"Cannot deserialize the message payload to type '{typeof(TMessage).FullName}'.");
                        await execution.Invoke(message);
                        await subscriptionClient.CompleteAsync(azureMessage.SystemProperties.LockToken);
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex, "ProcessMessagesAsync(Message, CancellationToken)");
                        await subscriptionClient.AbandonAsync(azureMessage.SystemProperties.LockToken);
                    }
                }
                , messageHandlerOptions);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Subscribe(Action<TMessage>)");
            throw;
        }
    }

The idea is, you subscribe to Azure Service Bus for a specific type of message, and that directly corresponds to a queue. In your subscription, you pass in a delegate for how to process the message.

This seems to work... with one caveat.

Regardless of what I set the ttl for the MaxAutoRenewDuration, or the OperationTimeout, on a long-running process for any given message, after a minute the message is unlocked from the queue and another subscriber picks it up and starts processing it.

My understanding is that is exactly what the MaxAutoRenewDuration is supposed to prevent... but it doesn't seem to prevent anything.

Can anyone tell me what I need to do differently to make sure the consumer owns the message through to completion?

Kaput answered 29/1, 2019 at 20:23 Comment(8)
What's your subscription description looks like (LockDuration specifically)? Did you try it with another entity? Premium or standard namespace? And the version of the client you're using.Gi
With LockDuration set to something like 60 seconds, MaxAutoRenewDuration set to 2 minutes, I was able to process messages in 60+ seconds and couldn't reproduce the issue you're experiencing. If you'd like to share your project on GitHub, could have a look. Suspect it's the entity that is causing the issue.Gi
@SeanFeldman, I can't find any property for LockDuration... are you referring to the Azure queue itself? That's set by default to 60 seconds.Kaput
I'm not clear on what you mean by the "entity" causing the issue? There's no entities in this, just a simple POCO message class.Kaput
Acute Service Bus entity is a queue/topic/subscription. So I was not able to reproduce this with LockDuration identical to what you have. Suspect it's the subscription issue. Have you tried to delete and recreate it?Gi
@SeanFeldman I have not, but I will. I will let you know if that's effective at all.Kaput
Hey @JeremyHolovacs Are you sure that this is role of MaxAutoRenewDuration, it is suppose to renew the lock duration so other processes won't start processing it? from doc: The maximum duration during which locks are automatically renewed - what does it mean?Christa
Not clear... all I know is that it's flaky, to the point of being unreliable. We're actually moving our longrunning processes to Azure Storage Queues now to avoid this issueKaput
K
0

It turns out the remote process that the consumer was running was failing silently and not returning a failure status code (or anything else); the auto-refresh mechanism hung waiting for the result, so the message ended up timing out.

I'm not clear on how to prevent that, but once I fixed the issue on the remote process, the problem was no longer reproducible.

Moral of the story: If everything looks right, and it's still timing out, it seems the autorefresh mechanism shares some resources with asynchronous operations you are waiting on. It may be another place to look for failures.

Kaput answered 4/2, 2019 at 1:7 Comment(0)
R
0

there are a few options I can think of that you might want to look at.

  1. Instead of using the default ReceiveMode = PeekLock in the SubscriptionClient, set it to ReceiveAndDelete so once a message is consumed, it will be removed from the queue and won't be consumed by any other clients, this does mean that you have to handle exception gracefully and perform retry yourself;

  2. Have a look at OperationTimeout which according to the doco it is for Duration after which individual operations will timeout

Recollected answered 29/1, 2019 at 23:30 Comment(3)
I would NOT recommend using ReceiveAndDelete in this scenario. That's not a good recommendation ad the auto-renewal feature works and for reliable processing PeekLock should be used.Gi
@SeanFeldman I agree. Those were just some potential solutions I quickly thought about so just put them out there. There are scenarios where they fit.Recollected
Yeah we would not be able to use ReceiveAndDelete. I agree this would circumvent the problem, but only by giving me another problem.Kaput
K
0

It turns out the remote process that the consumer was running was failing silently and not returning a failure status code (or anything else); the auto-refresh mechanism hung waiting for the result, so the message ended up timing out.

I'm not clear on how to prevent that, but once I fixed the issue on the remote process, the problem was no longer reproducible.

Moral of the story: If everything looks right, and it's still timing out, it seems the autorefresh mechanism shares some resources with asynchronous operations you are waiting on. It may be another place to look for failures.

Kaput answered 4/2, 2019 at 1:7 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.