Can the Azure Service Bus be delayed before retrying a message?
Asked Answered
D

5

37

The Azure Service Bus supports a built-in retry mechanism which makes an abandoned message immediately visible for another read attempt. I'm trying to use this mechanism to handle some transient errors, but the message is made available immediately after being abandoned.

What I would like to do is make the message invisible for a period of time after it is abandoned, preferably based on an exponentially incrementing policy.

I've tried to set the ScheduledEnqueueTimeUtc property when abandoning the message, but it doesn't seem to have an effect:

var messagingFactory = MessagingFactory.CreateFromConnectionString(...);

var receiver = messagingFactory.CreateMessageReceiver("test-queue");

receiver.OnMessageAsync(async brokeredMessage =>
{
    await brokeredMessage.AbandonAsync(
        new Dictionary<string, object>
        {
            { "ScheduledEnqueueTimeUtc", DateTime.UtcNow.AddSeconds(30) }
        });
    }
});

I've considered not abandoning the message at all and just letting the lock expire, but this would require having some way to influence how the MessageReceiver specifies the lock duration on a message, and I can't find anything in the API to let me change this value. In addition, it wouldn't be possible to read the delivery count of the message (and therefore make a decision for how long to wait for the next retry) until after the lock is already required.

Can the retry policy in the Message Bus be influenced in some way, or can a delay be artificially introduced in some other way?

Dahlgren answered 3/2, 2014 at 20:9 Comment(2)
Since there aren't any immediately great answers to this, I've opened a request on the Service Bus User Voice: feedback.windowsazure.com/forums/216926-service-bus/suggestions/…. Feel free to support or ignore.Dahlgren
Updated link for historical reference: feedback.azure.com/forums/216926-service-bus/suggestions/…Pops
D
15

I actually asked this same question last year (implementation aside) with the three approaches I could think of looking at the API. @ClemensVasters, who works on the SB team, responded that using Defer with some kind of re-receive is really the only way to control this precisely.

You can read my comment to his answer for a specific approach to doing it where I suggest using a secondary queue to store messages that indicate which primary messages have been deferred and need to be re-received from the main queue. Then you can control how long you wait by setting the ScheduledEnqueueTimeUtc on those secondary messages to control exactly how long you wait before you retry.

Doha answered 3/2, 2014 at 22:46 Comment(10)
A secondary queue introduces some problems with atomicity and "exactly-once" processing, as well as doubling-up on the queues. Perhaps it would be better to read from the dead-letter queue and implement the delay there?Dahlgren
I think Drew's approach would work well. The original message is never removed from the primary queue. It is marked deferred which means it will not be picked back up unless the sequence number is used to retrieve it. The secondary queue message only contains the sequence number. Remember that brokered messaging with peek lock provides you "at least once" processing, not "exactly-once" processing. If you need exactly once you have to handle that on your own.Ringster
Also, having the approach Drew suggests allows you to control the time in which the secondary queue message "pops" using the ScheduledEnqueueTimeUtc. If you used a deadletter queue only then unless you are controlling the speed of which you pull from the deadletter you are no better off.Ringster
Although it only really achieves "at least once", the use of duplicate message detection is what I mean by "exactly-once" processing. If you re-queue a message with the same ID within the detection period (e.g. 10 minutes), the queue or topic will accept the message, but not deliver it, silently dropping it. If you change the ID to re-queue it, you now introduce a "real" duplicate which is much harder to isolate.Dahlgren
The issue of atomicity is, as I see it, this: if you defer a message and then enqueing the sequence number fails, you are unable to ever process this phantom message.Dahlgren
Agreed, the true duplication is much harder to deal with. Regarding the defer queue you would process that queue by pulling the sequence number and then processing that message from the primary; however, you are correct that this could lead to atomicity issues in that transaction support is extremely limited. See geekswithblogs.net/asmith/archive/2012/04/02/149176.aspx for more on transactions.Ringster
Enqueue the sequence number before deferring the message. If you cannot enqueue the sequence number, then simply don't defer. The message will then be reprocessed normally, right away. If you can enqueue the sequence number, however, then you can defer the message/Aloes
If the message lock times out and is reprocessed, then worse case scenarios is that you get duplicate processing, which you should already handle, since again, the queues provide at least once processing, not only once processing. When the sequence number pops, and you try to peek the message, if it has already been processed, then there's nothing to do, and if it's still there, process it. This all seems like a sound, reliable approach to me.Aloes
What's the point of deferring the message if you're going to introduce a secondary queue? Just requeue the same message to the same queue with an ScheduledEnqueueTimeUtcLornalorne
How can you requeue exact same message, but modify only ScheduledEnqueueTimeUtc ? Message received is of type ServiceBusReceivedMessage who has only getters of there properties. If creating new ServiceBusMessage(ServiceBusReceivedMessage) it resets delivery count.Wellstacked
R
23

Careful here because I think you are confusing the retry feature with the automatic Complete/Abandon mechanism for the OnMessage event-driven message handling. The built in retry mechanism comes into play when a call to the Service Bus fails. For example, if you call to set a message as complete and that fails, then the retry mechanism would kick in. If you are processing a message an exception occurs in your own code that will NOT trigger a retry through the retry feature. Your question doesn't get explicit on if the error is from your code or when attempting to contact the service bus.

If you are indeed after modifying the retry policy that occurs when an error occurs attempting to communicate with the service bus you can modify the RetryPolicy that is set on the MessageReciver itself. There is an RetryExponitial which is used by default, as well as an abstract RetryPolicy you can create your own from.

What I think you are after is more control over what happens when you get an exception doing your processing, and you want to push off working on that message. There are a few options:

When you create your message handler you can set up OnMessageOptions. One of the properties is "AutoComplete". By default this is set to true, which means as soon as processing for the message is completed the Complete method is called automatically. If an exception occurs then abandon is automatically called, which is what you are seeing. By setting the AutoComplete to false you required to call Complete on your own from within the message handler. Failing to do so will cause the message lock to eventually run out, which is one of the behaviors you are looking for.

So, you could write your handler so that if an exception occurs during your processing you simply do not call Complete. The message would then remain on the queue until it's lock runs out and then would become available again. The standard dead lettering mechanism applies and after x number of tries it will be put into the deadletter queue automatically.

A caution of handling this way is that any type of exception will be treated this way. You really need to think about what types of exceptions are doing this and if you really want to push off processing or not. For example, if you are calling a third party system during your processing and it gives you an exception you know is transient, great. If, however, it gives you an error that you know will be a big problem then you may decide to do something else in the system besides just bailing on the message.

You could also look at the "Defer" method. This method actually will then not allow that message to be processed off the queue unless it is specifically pulled by its sequence number. You're code would have to remember the sequence number value and pull it. This isn't quite what you described though.

Another option is you can move away from the OnMessage, Event-driven style of processing messages. While this is very helpful you don't get a lot of control over things. Instead hook up your own processing loop and handle the abandon/complete on your own. You'll also need to deal some of the threading/concurrent call management that the OnMessage pattern gives you. This can be more work but you have the ultimate in flexibility.

Finally, I believe the reason the call you made to AbandonAsync passing the properties you wanted to modify didn't work is that those properties are referring to Metadata properties on the method, not standard properties on BrokeredMessage.

Ringster answered 3/2, 2014 at 21:22 Comment(3)
Mike, sorry I haven't been clear enough. This has little to do with the automatic Complete/Abandon mechanism and more to do with throttling the speed at which a previously abandoned message is retried. The issue is with the lack of delay between delivery attempts, such that long-lived (minutes, not milliseconds) transient errors in the processing of a message are not dealt with.Dahlgren
Fair enough. Based on your statement "I've considered not abandoning the message at all and just letting the lock expire" the above mechanism of just not calling complete would work. To your larger request though I think Drew's answer below is better.Ringster
This looks like a reasonable approach without having to write custom logic, are there any side-effects for this?Tims
D
15

I actually asked this same question last year (implementation aside) with the three approaches I could think of looking at the API. @ClemensVasters, who works on the SB team, responded that using Defer with some kind of re-receive is really the only way to control this precisely.

You can read my comment to his answer for a specific approach to doing it where I suggest using a secondary queue to store messages that indicate which primary messages have been deferred and need to be re-received from the main queue. Then you can control how long you wait by setting the ScheduledEnqueueTimeUtc on those secondary messages to control exactly how long you wait before you retry.

Doha answered 3/2, 2014 at 22:46 Comment(10)
A secondary queue introduces some problems with atomicity and "exactly-once" processing, as well as doubling-up on the queues. Perhaps it would be better to read from the dead-letter queue and implement the delay there?Dahlgren
I think Drew's approach would work well. The original message is never removed from the primary queue. It is marked deferred which means it will not be picked back up unless the sequence number is used to retrieve it. The secondary queue message only contains the sequence number. Remember that brokered messaging with peek lock provides you "at least once" processing, not "exactly-once" processing. If you need exactly once you have to handle that on your own.Ringster
Also, having the approach Drew suggests allows you to control the time in which the secondary queue message "pops" using the ScheduledEnqueueTimeUtc. If you used a deadletter queue only then unless you are controlling the speed of which you pull from the deadletter you are no better off.Ringster
Although it only really achieves "at least once", the use of duplicate message detection is what I mean by "exactly-once" processing. If you re-queue a message with the same ID within the detection period (e.g. 10 minutes), the queue or topic will accept the message, but not deliver it, silently dropping it. If you change the ID to re-queue it, you now introduce a "real" duplicate which is much harder to isolate.Dahlgren
The issue of atomicity is, as I see it, this: if you defer a message and then enqueing the sequence number fails, you are unable to ever process this phantom message.Dahlgren
Agreed, the true duplication is much harder to deal with. Regarding the defer queue you would process that queue by pulling the sequence number and then processing that message from the primary; however, you are correct that this could lead to atomicity issues in that transaction support is extremely limited. See geekswithblogs.net/asmith/archive/2012/04/02/149176.aspx for more on transactions.Ringster
Enqueue the sequence number before deferring the message. If you cannot enqueue the sequence number, then simply don't defer. The message will then be reprocessed normally, right away. If you can enqueue the sequence number, however, then you can defer the message/Aloes
If the message lock times out and is reprocessed, then worse case scenarios is that you get duplicate processing, which you should already handle, since again, the queues provide at least once processing, not only once processing. When the sequence number pops, and you try to peek the message, if it has already been processed, then there's nothing to do, and if it's still there, process it. This all seems like a sound, reliable approach to me.Aloes
What's the point of deferring the message if you're going to introduce a secondary queue? Just requeue the same message to the same queue with an ScheduledEnqueueTimeUtcLornalorne
How can you requeue exact same message, but modify only ScheduledEnqueueTimeUtc ? Message received is of type ServiceBusReceivedMessage who has only getters of there properties. If creating new ServiceBusMessage(ServiceBusReceivedMessage) it resets delivery count.Wellstacked
L
4

I ran into a similar issue where our order picking system is legacy and goes into maintenance mode each night.

Using the ideas in this article(https://markheath.net/post/defer-processing-azure-service-bus-message) I created a custom property to track how many times a message has been resubmitted and manually dead lettering the message after 10 tries. If the message is under 10 retries it clones the message increments the custom property and sets the en queue of the new message.

using Microsoft.Azure.ServiceBus;
public PickQueue()
{
    queueClient = new QueueClient(QUEUE_CONN_STRING, QUEUE_NAME); 
}
public async Task QueueMessageAsync(int OrderId)
{
    string body = JsonConvert.SerializeObject(OrderId);
    var message = new Message(Encoding.UTF8.GetBytes(body));
    await queueClient.SendAsync(message);
}

public async Task ReQueueMessageAsync(Message message, DateTime utcEnqueueTime)
{
    int resubmitCount = (int)(message.UserProperties["ResubmitCount"] ?? 0) + 1;
    if (resubmitCount > 10)
    {
        await queueClient.DeadLetterAsync(message.SystemProperties.LockToken);
    }
    else
    {
        Message clone = message.Clone();
        clone.UserProperties["ResubmitCount"] = ++resubmitCount;
        await queueClient.ScheduleMessageAsync(message, utcEnqueueTime);
    }
}
Liebowitz answered 4/10, 2018 at 16:27 Comment(0)
N
1

This question asks how to implement exponential backoff in Azure Functions. If you do not want to use the built-in RetryPolicy (only available when autoComplete = false), here's the solution I've been using:

        public static async Task ExceptionHandler(IMessageSession MessageSession, string LockToken, int DeliveryCount)
        {
            if (DeliveryCount < Globals.MaxDeliveryCount)
            {
                var DelaySeconds = Math.Pow(Globals.ExponentialBackoff, DeliveryCount);
                await Task.Delay(TimeSpan.FromSeconds(DelaySeconds));
                await MessageSession.AbandonAsync(LockToken);
            }
            else
            {
                await MessageSession.DeadLetterAsync(LockToken);
            }
        }

Notwithstanding answered 23/11, 2021 at 18:17 Comment(2)
Careful with this. If your delay is longer than Service Bus LockDuration then the message will be abandoned automatically since CompleteAsync wasn't called on it. Also if using within Azure Functions you also have to worry about what happens if the function timeout is shorter than your delay.Loser
Yes - we set the message timeout to 5 minutes, the ExponentialBackoff to 1.8, and the MaxDeliveryCount to 10.Notwithstanding
I
1

I usually follow one of these approaches:

  • Abandoning, using the Message Lock Duration and Max Delivery Count to as the delay time and maximum number of retries respectively.
  • Publishing a cloned message with the ScheduledEnqueueTimeUtc property set for the desired delay (original message needs to be removed).
  • Publishing a cloned message to a helper queue, invoking the ScheduleMessaeAsync method to provide the desired delay (original message needs to be removed). Client needs to subscribe to both original queue/topic subscription and helper queue.

They are explained in this link.

Inimical answered 1/9, 2023 at 3:49 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.