Resubmitting a message from dead letter queue - Azure Service Bus
Asked Answered
B

8

26

I have created a service bus queue in Azure and it works well. And if the message is not getting delivered within default try (10 times), it is correctly moving the message to the dead letter queue.

Now, I would like to resubmit this message from the dead letter queue back to the queue where it originated and see if it works again. I have tried the same using service bus explorer. But it gets moved to the dead letter queue immediately.

Is it possible to do the same, and if so how?

Bikini answered 23/1, 2017 at 2:55 Comment(0)
A
5

The Service Bus Explorer tool always creates a clone of the original message when you repair and resubmit a message from the deadletter queue. It could not be any different as by default Service Bus messaging does not provide any message repair and resubmit mechanism. I suggest you to investigate why your message gets ends up in the deadletter queue as well as its clone when you resubmit it. Hope this helps!

Approval answered 23/1, 2017 at 7:38 Comment(1)
The original message was dead lettered because it attempted to deliver 10 times, and for some reason the service was not available then. But now the service is back on. The clone of the message, also shows the same error that as the original message, and with a single try, the count is already 11. So i assume the count of the message delivered is not reseted in the clone messageBikini
K
17

We had a batch of around 60k messages, which need to be reprocessed from the dead letter queue. Peeking and send the messages back via Service Bus Explorer took around 6 minutes per 1k messages from my machine. I solved the issue by setting a forward rule for DLQ messages to another queue and from there auto forward it to the original queue. This solution took around 30 seconds for all 60k messages.

Kraal answered 30/10, 2020 at 16:1 Comment(5)
could you please share some links for steps how to set autoforward on dlq to send to a new topic ?Suasion
Don't forget to remove the forwarding after reprocessing the deadletter messages.Expectancy
Using C# var client = new ServiceBusAdministrationClient(connectionString); var sub = client.GetSubscriptionAsync(topicName, subscriptionName).Result; sub.Value.ForwardDeadLetteredMessagesTo = "myqueue"; sub= client.UpdateSubscriptionAsync(subscription).Result; Using Powershell and AzServiceBus $sub = Get-AzServiceBusSubscription -ResourceGroupName MyRG -Namespace MyNS -Topic MyT -Subscription MyS $sub.ForwardDeadLetteredMessagesTo = "myqueue" Set-AzServiceBusSubscription -ResourceGroupName MyRG -Namespace MyNS -Topic MyT -InputObject $subGalloway
The feature to forward from one queue to another is not available in BASIC mode. Just one for noting before people try it.Guidepost
wow this saved me years of workMadi
G
17

We regularly need to resubmit messages. The answer from @Baglay-Vyacheslav helped a lot. I've pasted some updated C# code that works with the latest Azure.Messaging.ServiceBus Nuget Package.

Makes it much quicker/easier to process DLQ on both queues/topics/subscribers.

using Azure.Messaging.ServiceBus;
using System.Collections.Generic;
using System.Threading.Tasks;
using NLog;

namespace ServiceBus.Tools
{
    class TransferDeadLetterMessages
    {
        // https://github.com/Azure/azure-sdk-for-net/blob/Azure.Messaging.ServiceBus_7.2.1/sdk/servicebus/Azure.Messaging.ServiceBus/README.md

        private static Logger logger = LogManager.GetCurrentClassLogger();

        private static ServiceBusClient client;
        private static ServiceBusSender sender;
    
        public static async Task ProcessTopicAsync(string connectionString, string topicName, string subscriberName, int fetchCount = 10)
        {
            try
            {
                client = new ServiceBusClient(connectionString);
                sender = client.CreateSender(topicName);

                ServiceBusReceiver dlqReceiver = client.CreateReceiver(topicName, subscriberName, new ServiceBusReceiverOptions
                {
                    SubQueue = SubQueue.DeadLetter,
                    ReceiveMode = ServiceBusReceiveMode.PeekLock
                });

                await ProcessDeadLetterMessagesAsync($"topic: {topicName} -> subscriber: {subscriberName}", fetchCount, sender, dlqReceiver);
            }
            catch (Azure.Messaging.ServiceBus.ServiceBusException ex)
            {
                if (ex.Reason == Azure.Messaging.ServiceBus.ServiceBusFailureReason.MessagingEntityNotFound)
                {
                    logger.Error(ex, $"Topic:Subscriber '{topicName}:{subscriberName}' not found. Check that the name provided is correct.");
                }
                else
                {
                    throw;
                }
            }
            finally
            {
                await sender.CloseAsync();
                await client.DisposeAsync();
            }
        }

        public static async Task ProcessQueueAsync(string connectionString, string queueName, int fetchCount = 10)
        {         
            try
            {
                client = new ServiceBusClient(connectionString);
                sender = client.CreateSender(queueName);

                ServiceBusReceiver dlqReceiver = client.CreateReceiver(queueName, new ServiceBusReceiverOptions
                {
                    SubQueue = SubQueue.DeadLetter,
                    ReceiveMode = ServiceBusReceiveMode.PeekLock
                });

                await ProcessDeadLetterMessagesAsync($"queue: {queueName}", fetchCount, sender, dlqReceiver);
            }
            catch (Azure.Messaging.ServiceBus.ServiceBusException ex)
            {
                if (ex.Reason == Azure.Messaging.ServiceBus.ServiceBusFailureReason.MessagingEntityNotFound)
                {
                    logger.Error(ex, $"Queue '{queueName}' not found. Check that the name provided is correct.");
                }
                else
                {
                    throw;
                }
            }
            finally
            {
                await sender.CloseAsync();
                await client.DisposeAsync();
            }
        }

        private static async Task ProcessDeadLetterMessagesAsync(string source, int fetchCount, ServiceBusSender sender, ServiceBusReceiver dlqReceiver)
        {
            var wait = new System.TimeSpan(0, 0, 10);

            logger.Info($"fetching messages ({wait.TotalSeconds} seconds retrieval timeout)");
            logger.Info(source);

            IReadOnlyList<ServiceBusReceivedMessage> dlqMessages = await dlqReceiver.ReceiveMessagesAsync(fetchCount, wait);

            logger.Info($"dl-count: {dlqMessages.Count}");

            int i = 1;

            foreach (var dlqMessage in dlqMessages)
            {
                logger.Info($"start processing message {i}");
                logger.Info($"dl-message-dead-letter-message-id: {dlqMessage.MessageId}");
                logger.Info($"dl-message-dead-letter-reason: {dlqMessage.DeadLetterReason}");
                logger.Info($"dl-message-dead-letter-error-description: {dlqMessage.DeadLetterErrorDescription}");

                ServiceBusMessage resubmittableMessage = new ServiceBusMessage(dlqMessage);

                await sender.SendMessageAsync(resubmittableMessage);

                await dlqReceiver.CompleteMessageAsync(dlqMessage);

                logger.Info($"finished processing message {i}");
                logger.Info("--------------------------------------------------------------------------------------");

                i++;
            }

            await dlqReceiver.CloseAsync();

            logger.Info($"finished");
        }
    }
}
Gloze answered 3/8, 2021 at 8:21 Comment(3)
This code will publish deadlettered message to whole topic, so if your topic has multiple subscription, all subscriptions will receive this re-submitted message, even if message was deadlettered by only one subscription. ASB does not support sending messages to specific subscriptions.Acclaim
I think this scenario can be addressed with rules on messages. Add a filter to exclude a message with a particular property being sent to topic subscription and update the code to add the property to the required messages. Alternatively ensure topic subscription code is idempotent.Gloze
This is great, we had about 70k deadletter messages due to a massive outage. Would have taken hours upon hours to resubmit those. Thank you!Likelihood
S
13

You'd need to send a new message with the same payload. ASB by design doesn't support message resubmission.

Secondbest answered 23/1, 2017 at 3:3 Comment(3)
Azure now has a Re-send selected messages button in Service Bus Explorer.Schuman
That's a tool (SB Explorer in the portal). My answer is still accurate as it's talking about the service itself, @CristianTeodorov.Secondbest
That's a somewhat pedantic response to a comment on a terse answer.Hotze
O
7

Try to remove dead letter reason

resubmittableMessage.Properties.Remove("DeadLetterReason");
resubmittableMessage.Properties.Remove("DeadLetterErrorDescription");

full code

using Microsoft.ServiceBus.Messaging;
using System.Transactions;

namespace ResubmitDeadQueue
{
    class Program
    {
        static void Main(string[] args)
        {

            var connectionString = "";
            var queueName = "";

            var queue = QueueClient.CreateFromConnectionString(connectionString, QueueClient.FormatDeadLetterPath(queueName), ReceiveMode.PeekLock);

            BrokeredMessage originalMessage
                ;
            var client = QueueClient.CreateFromConnectionString(connectionString, queueName);
            do
            {
                originalMessage = queue.Receive();
                if (originalMessage != null)
                {
                    using (var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
                    {
                        // Create new message
                        var resubmittableMessage = originalMessage.Clone();

                        // Remove dead letter reason and description
                        resubmittableMessage.Properties.Remove("DeadLetterReason");
                        resubmittableMessage.Properties.Remove("DeadLetterErrorDescription");

                        // Resend cloned DLQ message and complete original DLQ message
                        client.Send(resubmittableMessage);
                        originalMessage.Complete();

                        // Complete transaction
                        scope.Complete();
                    }
                }
            } while (originalMessage != null);
        }
    }
}

Thanks to some other responses here!

Originate answered 23/1, 2020 at 10:40 Comment(1)
I tried the exact code with 0 message in main queue, 1 message in DLQ. After running the code, I don't see any message in both main and DLQ .Mckinnie
A
5

The Service Bus Explorer tool always creates a clone of the original message when you repair and resubmit a message from the deadletter queue. It could not be any different as by default Service Bus messaging does not provide any message repair and resubmit mechanism. I suggest you to investigate why your message gets ends up in the deadletter queue as well as its clone when you resubmit it. Hope this helps!

Approval answered 23/1, 2017 at 7:38 Comment(1)
The original message was dead lettered because it attempted to deliver 10 times, and for some reason the service was not available then. But now the service is back on. The clone of the message, also shows the same error that as the original message, and with a single try, the count is already 11. So i assume the count of the message delivered is not reseted in the clone messageBikini
H
4

It may be "duplicate message detection" as Peter Berggreen indicated or more likely if you are directly moving the BrokeredMessage from the dead letter queue to the live queue then the DeliveryCount would still be at maximum and it would return to the dead letter queue.

Pull the BrokeredMessage off the dead letter queue, get the content using GetBody(), create in new BrokeredMessage with that data and send it to the queue. You can do this in a safe manor, by using peek to get the message content off the dead letter queue and then send the new message to the live queue before removing the message from the dead letter queue. That way you won't lose any crucial data if for some reason it fails to write to the live queue.

With a new BrokeredMessage you should not have an issue with "duplicate message detection" and the DeliveryCount will be reset to zero.

Hypergolic answered 24/10, 2017 at 15:35 Comment(2)
Couldn't you achieve the safety by receiving the message off the dead letter queue but not consuming it until after successfully writing to the live queue? Seems like it would be a bit simpler implementation.Hurff
Yes, the "receiving and not consuming" is essentially peeking at the message.Hypergolic
R
2

It sounds like it could be related to ASB's "duplicate message detection" functionality.

When you resubmit a message in ServiceBus Explorer it will clone the message and thereby the new message will have the same Id as the original message in the deadletter queue.

If you have enabled "Requires Duplicate Detection" on the queue/topic and you try to resubmit the message within the "Duplicate Detection History Time Window", then the message will immediately be moved to the deadletter queue again.

If you want to use Service Bus Explorer to resubmit deadletter messages, then I think that you will have to disable "Requires Duplicate Detection" on the queue/topic.

Rill answered 28/8, 2017 at 8:22 Comment(0)
M
0

Here is sample Python code developed to resubmit messages from a dead letter queue (DLQ) to of a normal queue (not topic) in an Azure Service Bus. The queue was encountered having run completely out of space and not responding to new submission.

The code resubmits the messages in small batches (currently set at 10) to ensure that space is created the same way by removing the message from the DLQ as soon as it has been successfully re-submitted.

"""
Reads all DLQ messages from a Service Bus Queue and resubmits them to the original queue
"""  

# python -m pip install --upgrade pip
# python -m pip install --upgrade wheel
# python -m pip install --upgrade setuptools
# python -m pip install --upgrade azure-servicebus

import logging
from azure.servicebus import ServiceBusClient, ServiceBusMessage

# Your Service Bus connection string and queue name
CONNECTION_STRING = "SAS Connection String goes here"
QUEUE_NAME = "target-queue"

if __name__ == "__main__":

    print("Reads all DLQ messages from a Service Bus Queue and resubmits them to the original queue")

    logging.basicConfig(level=logging.WARNING)

    try:
        with ServiceBusClient.from_connection_string(CONNECTION_STRING) as client:
            with client.get_queue_sender(queue_name=QUEUE_NAME) as sender:
                with client.get_queue_receiver(queue_name=f"{QUEUE_NAME}/$DeadLetterQueue") as receiver:

                    # Loop to ensure all messages are processed
                    while True:
                        dead_letter_messages = receiver.receive_messages(max_message_count=10, max_wait_time=5)
                        if not dead_letter_messages:
                            break  # Exit loop if no more messages

                        # Log progress by writing the time and number of messages received to the console
                        print(f"{cc.CGREEN}Received {len(dead_letter_messages)} messages from DLQ{cc.CEND}")

                        for message in dead_letter_messages:
                            try:
                                # Convert generator to string
                                BODY_STR = b''.join(message.body)

                                # Create new ServiceBusMessage
                                resubmitted_message = ServiceBusMessage(body=BODY_STR)

                                # Resubmit the message for processing
                                sender.send_messages(resubmitted_message)

                                # Complete the message from the DLQ
                                receiver.complete_message(message)

                                logging.info("Message resubmitted for processing and removed from DLQ")
                            except Exception as e:
                                logging.error("Error processing message: %s", e)

        logging.info("Completed processing DLQ messages.")
    except Exception as e:
        logging.error("An error occurred: %s", e)


    print("Done")

The CONNECTION_STRING value makes use of a SAS (Shared Access Policy) from the Azure Service Bus or directly to the Queue expressed as the Primary or Secondary Connection String.

This saved me a lot of time and might help someone else.

Marcimarcia answered 3/4 at 22:42 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.