Signal a rollback from a JMS MessageListener
Asked Answered
D

4

13

I've been working with JMS and ActiveMQ. Everything is working wonders. I am not using spring, nor can I.

The interface javax.jms.MessageListener has only one method, onMessage. From within a implementation, there is a chance an exception will be thrown. If in fact an exception gets thrown, then I say the message wasn't properly processed and needs to be re-tried. So, I need ActiveMQ to wait for a little while and then, retry. i.e. I need the thrown exception to rollback the JMS transaction.

How can I accomplish such a behaviour?

Maybe there is some configuration in ActiveMQ I wasn't able to find.

Or... maybe could do away with registering MessageListeners to consumers and consume the messages myself, in a a loop like:

while (true) {
    // ... some administrative stuff like ...
    session = connection.createSesstion(true, SESSION_TRANSACTED)
    try {
        Message m = receiver.receive(queue, 1000L);
        theMessageListener.onMessage(m);
        session.commit();
    } catch (Exception e) {
        session.rollback();
        Thread.sleep(someTimeDefinedSomewhereElse);
    }
    // ... some more administrative stuff
}

in a couple of threads, instead of registering the listener.

Or... I could somehow decorate/AOP/byte-manipulate the MessageListeners to do this themselves.

What route would you take and why?

note: I don't have full control over the MessageListeners code.

EDIT A test for proof of concept:

@Test
@Ignore("Interactive test, just a proof of concept")
public void transaccionConListener() throws Exception {
    final AtomicInteger atomicInteger = new AtomicInteger(0);

    BrokerService brokerService = new BrokerService();

    String bindAddress = "vm://localhost";
    brokerService.addConnector(bindAddress);
    brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
    brokerService.setUseJmx(false);
    brokerService.start();

    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress);
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(500);
    redeliveryPolicy.setBackOffMultiplier(2);
    redeliveryPolicy.setUseExponentialBackOff(true);
    redeliveryPolicy.setMaximumRedeliveries(2);

    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
    activeMQConnectionFactory.setUseRetroactiveConsumer(true);
    activeMQConnectionFactory.setClientIDPrefix("ID");
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);

    pooledConnectionFactory.start();

    Connection connection = pooledConnectionFactory.createConnection();
    Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
    Queue helloQueue = session.createQueue("Hello");
    MessageConsumer consumer = session.createConsumer(helloQueue);
    consumer.setMessageListener(new MessageListener() {

        @Override
        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                switch (atomicInteger.getAndIncrement()) {
                    case 0:
                        System.out.println("OK, first message received " + textMessage.getText());
                        message.acknowledge();
                        break;
                    case 1:
                        System.out.println("NOPE, second must be retried " + textMessage.getText());
                        throw new RuntimeException("I failed, aaaaah");
                    case 2:
                        System.out.println("OK, second message received " + textMessage.getText());
                        message.acknowledge();
                }
            } catch (JMSException e) {
                e.printStackTrace(System.out);
            }
        }
    });
    connection.start();

    {
        // A client sends two messages...
        Connection connection1 = pooledConnectionFactory.createConnection();
        Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection1.start();

        MessageProducer producer = session1.createProducer(helloQueue);
        producer.send(session1.createTextMessage("Hello World 1"));
        producer.send(session1.createTextMessage("Hello World 2"));

        producer.close();
        session1.close();
        connection1.stop();
        connection1.close();
    }
    JOptionPane.showInputDialog("I will wait, you watch the log...");

    consumer.close();
    session.close();
    connection.stop();
    connection.close();
    pooledConnectionFactory.stop();

    brokerService.stop();

    assertEquals(3, atomicInteger.get());
}
Deepfreeze answered 27/8, 2011 at 10:57 Comment(1)
Thank you very much whaley and @Ammar for answers. I am upvoting both since you both pointed me into the right track. But not picking a right answer yet. Because more testing is needed.Deepfreeze
F
12

If you want to use SESSION_TRANSACTED as your acknowledgement mode, then you need to setup a RedeliveryPolicy on your Connection/ConnectionFactory. This page on ActiveMQ's website also contains some good info for what you might need to do.

Since you aren't using Spring, you can setup a RedeliveryPolicy with something similar to the following code (taken from one of the above links):

RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
policy.setMaximumRedeliveries(2);

Edit Taking your code snippet added to the answer, the following shows how this works with transactions. Try this code with the Session.rollback() method commented out and you'll see that using SESION_TRANSACTED and Session.commit/rollback works as expected:

@Test
public void test() throws Exception {
    final AtomicInteger atomicInteger = new AtomicInteger(0);

    BrokerService brokerService = new BrokerService();

    String bindAddress = "vm://localhost";
    brokerService.addConnector(bindAddress);
    brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
    brokerService.setUseJmx(false);
    brokerService.start();

    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress);
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(500);
    redeliveryPolicy.setBackOffMultiplier(2);
    redeliveryPolicy.setUseExponentialBackOff(true);
    redeliveryPolicy.setMaximumRedeliveries(2);

    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
    activeMQConnectionFactory.setUseRetroactiveConsumer(true);
    activeMQConnectionFactory.setClientIDPrefix("ID");

    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);

    pooledConnectionFactory.start();

    Connection connection = pooledConnectionFactory.createConnection();
    final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
    Queue helloQueue = session.createQueue("Hello");
    MessageConsumer consumer = session.createConsumer(helloQueue);
    consumer.setMessageListener(new MessageListener() {

        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                switch (atomicInteger.getAndIncrement()) {
                    case 0:
                        System.out.println("OK, first message received " + textMessage.getText());
                        session.commit();
                        break;
                    case 1:
                        System.out.println("NOPE, second must be retried " + textMessage.getText());
                        session.rollback();
                        throw new RuntimeException("I failed, aaaaah");
                    case 2:
                        System.out.println("OK, second message received " + textMessage.getText());
                        session.commit();
                }
            } catch (JMSException e) {
                e.printStackTrace(System.out);
            }
        }
    });
    connection.start();

    {
        // A client sends two messages...
        Connection connection1 = pooledConnectionFactory.createConnection();
        Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection1.start();

        MessageProducer producer = session1.createProducer(helloQueue);
        producer.send(session1.createTextMessage("Hello World 1"));
        producer.send(session1.createTextMessage("Hello World 2"));

        producer.close();
        session1.close();
        connection1.stop();
        connection1.close();
    }
    JOptionPane.showInputDialog("I will wait, you watch the log...");

    consumer.close();
    session.close();
    connection.stop();
    connection.close();
    pooledConnectionFactory.stop();

    assertEquals(3, atomicInteger.get());
}

}

Furor answered 27/8, 2011 at 13:0 Comment(4)
That didn't work. But pointed me into the right direction. I will leave DUPS_OK_ACKNOWLEDGE since it seems to be the one that works that I have to work the least.Deepfreeze
You need to paste the entirety of your code, because you aren't doing something correctly with your Session. DUPS_OK_ACKNOWLEDGE only appears to be working since client acknowledgement is lazy and the broker will just keep resending messages until the client eventually does ack.Furor
I pasted a proof of concept. I can only make it work with DUPS_OK_ACKNOWLEDGE and the message.acknowledgement doesn't seem to make a difference.Deepfreeze
Thank you soo much. Yep, I will do a try { legacylistener.onmessage; session.commit; } catch (Throwable t) { session.rollback; } as I register the listeners.Deepfreeze
L
2

You need to set the acknowledgment mode to Session.CLIENT_ACKNOWLEDGE, the client acknowledges a consumed message by calling the message's acknowledge method.

QueueSession session = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);

Then, after processing the message to need to call the Message.acknowledge() method in order to remove that message.

Message message = ...;
// Processing message

message.acknowledge();
Lazos answered 27/8, 2011 at 11:10 Comment(4)
It does not work. onMessage still gets called once, even if message.acknowledge() never gets called.Deepfreeze
Have you set the acknowledgment mode properly ? It must be set to Session.CLIENT_ACKNOWLEDGE !Lazos
But it works with (false, Session.DUPS_OK_ACKNOWLEDGE)... message.acknowledge() does not seem to do the trick.Deepfreeze
If he's using CLIENT_ACKNOWLEDGE then he needs to use Session.recover(), not Session.rollback().Furor
H
2

A little late, but here goes -

I would not use a MessageListener but rather a global pools to manage listening and processing.

ListeningPool -> listener -> submit processing task -> ProcessingPool -> execute and acknowledge or close without acknowledgment.

  1. Maintain 2 thread pools, one for listeners and one for processors.
  2. Have a listening Runnable implementation that listens to a queue in a while true loop, and consumer.receive(timeout) method. In the finally block, close connection, session and consumer if there was no message received. If a message is received, submit a task to processing pool with all the conn, session, message and consumer arguments.
  3. Have a processing implementation that takes in the message, connection, session and consumer. Do your processing and acknowledge if all ok. If not, close without acknowledgement. This would trigger a redelivery according to your server's redelivery policy.
  4. Initialize your listening pool with all the Listener Tasks, listening for messages, each for one queue. Initialize your processing pool with parameters acceptable to your application runtime.
public class CustomMessageListener implements Runnable {

    private ConnectionFactory connectionFactory;
    private MessageProcessor processor;
    private long backOff;
    private boolean stopped = false;
    private Executor processPool;

    public CustomMessageListener(ConnectionFactory connectionFactory,
        long backOff, MessageProcessor processor, Executor processPool) {
        this.connectionFactory = connectionFactory;
        this.backOff = backOff;
        this.processor = processor;
        this.processPool = processPool;
    }

    @Override
    public void run() {
        while (!stopped) {
            listen();
        }
    }

    public void stop() {
        this.stopped = true;
    }

    public void listen() {
        Connection c = null;
        Session s = null;
        MessageConsumer consumer = null;
        boolean received = false;
        try {
            c = connectionFactory.createConnection();
            s = c.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            consumer = s.createConsumer(...);
            Message message = consumer.receive(backOff); // waits maximum backOff ms for a message
            if (message != null) {
                received = true;
                // submit a task to processing pool...
                executor.submit(processor.process(message, s, consumer, c));
            }
        } catch (JMSException ex) {
            // log your exception
        } finally {
            if (!received) {
                // close conn, session, consumer
            }
        }
    }
}

public class MessageProcessor {

    public Runnable process(Message msg, Session s, MessageConsumer consumer, Connection conn) {
        return () - > {
            try {
                //do your processing
                msg.acknowledge(); // done
            } catch (JMSException ex) {
                // log your exception
            } finally {
                // close your resources
            }
        };
    }
}

You can call stop() to stop listening for more messages, for a graceful shutdown. Include a queueName in the constructor to listen for a particular queue.

Hint answered 12/9, 2021 at 8:9 Comment(1)
Experience tells me that a Listener is not the way to go. I would agree with you basic premiseDeepfreeze
V
0

If your session is transacted,then "acknowledgeMode" is ignored anyways..So, just leave your session transacted and use session.rollback and session.commit to commit or rollback your transaction.

Viscose answered 25/12, 2014 at 14:16 Comment(1)
I think the (my) problem is that the session is not accessable within the MessageListener.onMessage(Message).Outgoings

© 2022 - 2024 — McMap. All rights reserved.