ActiveMQ how to resend/retry DLQ messages programmatically
Asked Answered
U

3

5

I would like to create a simple code snippet that fetches all messages from the DLQ and re-sends them to the original destination (AKA resend/retry)

It can be done easily by the ActiveMQ UI (but for a single message at a time).

Urbanize answered 10/2, 2020 at 17:43 Comment(1)
If my answer addressed your question please mark it as correct to help other users who have this same problem in the future. Thanks!Slander
B
6

Retrying all messages on the DLQ is already implemented in activemq as an mbean.

You can trigger the retry method with jmxterm/jolokia

e.g

Replaying all messages on queue ActiveMQ.DLQ with jolokia

curl -XGET --user admin:admin --header "Origin: http://localhost" http://localhost:8161/api/jolokia/exec/org.apache.activemq:brokerName=localhost,destinationName=ActiveMQ.DLQ,destinationType=Queue,type=Broker/retryMessages

NOTE: You can only use this method on a queue that is marked as a DLQ. It will not work for regular queues.

Also the DLQ queue can have its 'DLQ' flag set to false if the server is restarted. It is automatically set to true when a new message is sent to the DLQ

Bdellium answered 28/8, 2020 at 3:36 Comment(3)
This seems very useful. Is there documentation on this? Where can I find it?Joshia
@Joshia There doesn't seem to be documentation on which mbeans are available, outside of just querying the JMX directly. You can list available mbeans with something like jconsole. Otherwise here's a gist with the list for you. gist.github.com/tallpsmith/d59acb248cd8ea4f3bf2 There is some documentation but its not great. activemq.apache.org/jmxBdellium
How do you do this in activemq artemis?Kluge
S
4

There is no direct JMS API for re-sending a message from a DLQ to its original queue. In fact, the JMS API doesn't even discuss dead-letter queues. It's merely a convention used by most brokers to deal with messages that can't be consumed.

You'd need to create an actual JMS consumer to receive the message from the DLQ and then create a JMS producer to send the message back to its original queue.

It's important that you use Session.TRANSACTED mode to avoid potential message loss or duplication.

If you use Session.AUTO_ACKNOWLEDGE and there is a problem between the time the message is consumed and sent (e.g the application crashes, hardware failure, etc.) then the message could be lost due to the fact that it was already acknowledged before it was sent successfully.

If you use Session.CLIENT_ACKNOWLEDGE and there is a problem between the time the message is sent and acknowledged then the message could ultimately be duplicated due to the fact that it was already sent before it was acknowledged successfully.

Both operations should be part of the JMS transaction so that the work is atomic.

Lastly, I recommend you either invoke commit() on the transacted session for each message sent or after a small batch of messages (e.g. 10). Given that you have no idea how many messages are in the DLQ it would be unwise to process every message in a single transaction. Generally you want the transaction to be as small as possible in order to minimize the window during which an error might occur and the transaction's work will need to be performed again. Also, the larger the transaction is the more heap memory will be required on the broker to keep track of the work in the transaction. Keep in mind that you can invoke commit() on the same session as many times as you want. You don't need to create a new session for each transaction.

Slander answered 10/2, 2020 at 17:53 Comment(3)
Session.TRANSACTED should be implemented in conjunction to session.commit();?Urbanize
Yes. You must call commit() on the transacted session in order to actually complete the operations.Slander
Should i create a new session for each message retry? this way i can rollback if something goes wrong with the handling of my retry-consumer. If not - I will have to rollback if any of the messages handling will throw an exception (all or nothing)Urbanize
U
1

After Justin's reply I've manually implemented the retry mechanism like so:

public void retryAllDlqMessages() throws JMSException {

        logger.warn("retryAllDlqMessages starting");

        logger.warn("Creating a connection to {}", activemqUrl);

        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("test", "test", activemqUrl);

        HashMap<String, MessageProducer> messageProducersMap = new HashMap<>();
        MessageConsumer consumer = null;

        try (ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
                ActiveMQSession session = (ActiveMQSession) connection.createSession(true, Session.SESSION_TRANSACTED)) {
            String dlqName = getDlqName();
            logger.warn("Creating a session to {}", dlqName);
            ActiveMQQueue queue = (ActiveMQQueue) session.createQueue(dlqName);
            logger.warn("Starting JMS Connection");
            connection.start();

            logger.warn("Creating a DLQ consumer");
            consumer = session.createConsumer(queue);
            logger.warn("Consumer start receiving");

            Message message = consumer.receive(CONSUMER_RECEIVE_TIME_IN_MS);

            int retriedMessages = 0;
            while (message != null) {
                try {
                    retryMessage(messageProducersMap, session, message);
                    retriedMessages++;
                } catch (Exception e) {
                    logger.error("Error calling retryMessage for message = {}", message);
                    logger.error("Rolling back the JMS transaction...");
                    session.rollback();
                    return;
                }
                message = consumer.receive(CONSUMER_RECEIVE_TIME_IN_MS);
            }
            logger.warn("Consumer finished retrying {} messages", retriedMessages);
            logger.warn("Commiting JMS Transactions of retry");
            session.commit();

        } finally {
            if (!messageProducersMap.isEmpty()) {
                logger.warn("Closing {} messageProducers in messageProducersMap", messageProducersMap.size());
                for (MessageProducer producer : messageProducersMap.values()) {
                    producer.close();
                }
            }
            if (consumer != null) {
                logger.warn("Closing DLQ Consumer");
                consumer.close();
            }
        }
    }

    private void retryMessage(HashMap<String, MessageProducer> messageProducersMap, ActiveMQSession session, Message message) {

        ActiveMQObjectMessage qm = (ActiveMQObjectMessage) message;
        String originalDestinationName = qm.getOriginalDestination().getQualifiedName();
        logger.warn("Retry message with JmsID={} to original destination {}", qm.getJMSMessageID(), originalDestinationName);
        try {
            if (!messageProducersMap.containsKey(originalDestinationName)) {
                logger.warn("Creating a new producer for original destination: {}", originalDestinationName);
                messageProducersMap.put(originalDestinationName, session.createProducer(qm.getOriginalDestination()));
            }
            logger.info("Producing message to original destination");
            messageProducersMap.get(originalDestinationName).send(qm);
            logger.info("Message sent");
        } catch (Exception e) {
            logger.error("Message retry failed with exception", e);
        }
    }
Urbanize answered 11/2, 2020 at 12:50 Comment(6)
@JustinBertram, please take another look , Iv'e updated my answerUrbanize
Since you're using Session.AUTO_ACKNOWLEDGE there's a chance for message loss. I've updated my answer with more details.Slander
@JustinBertram thanks, iv'e noticed. working on it nowUrbanize
@JustinBertram - Iv'e used CLIENT_ACKNOWLEDGE instead.Urbanize
Since you're using Session.CLIENT_ACKNOWLEDGE there's a chance for message duplication. I've updated my answer with more details.Slander
Also, it's worth noting that this solution will be dependent on ActiveMQ implementation classes (e.g. ActiveMQObjectMessage) rather than just pure the JMS API. This means that it won't be portable to any other JMS broker.Slander

© 2022 - 2024 — McMap. All rights reserved.