ActiveMQ consumer level timeout
Asked Answered
E

1

6

I trying to create a consumer level timeout in Active MQ (version 5.15.0). Consider one message is picked by a consumer but not able to acknowledge so in that case i want consumer to timeout so that same message can be picked by other consumer listening to broker.

My Producer code where I am setting two consumer listener:

public class JmsMessageListenerAckExample {
  public static void main(String[] args) throws URISyntaxException, Exception {
    Connection connection = null;
    try {
      // Producer
      ConnectionFactory factory = createActiveMQConnectionFactory();
      connection = factory.createConnection();
      Session session = connection.createSession(false,
          Session.CLIENT_ACKNOWLEDGE);
      Queue queue = session.createQueue("customerQueue");
      String payload = "Important Task";
      Message msg = session.createTextMessage(payload);
      MessageProducer producer = session.createProducer(queue);

      System.out.println("Sending text '" + payload + "'");
      producer.send(msg);

      // Consumer
      MessageConsumer consumer1 = session.createConsumer(queue);
      consumer1.setMessageListener(
          new AckMessageListener(false, "consumer1"));
      Thread.sleep(1000);
      System.out.println("Creating new message listener to acknowledge");
      producer.send(msg);
      MessageConsumer consumer2 = session.createConsumer(queue);
      consumer2.setMessageListener(
          new AckMessageListener(true, "consumer2"));
      connection.start();

      Thread.sleep(3000);
      session.close();
    } finally {
      if (connection != null) {
        connection.close();
      }
    }
  }

  private static ActiveMQConnectionFactory createActiveMQConnectionFactory() {
    // Create a connection factory.
    final ActiveMQConnectionFactory connectionFactory =
        new ActiveMQConnectionFactory("tcp://localhost:61616");

    // Pass the username and password.
    connectionFactory.setUserName("user");
    connectionFactory.setPassword("user");
    return connectionFactory;
  }
}

This is my consumer listener:

public class AckMessageListener implements MessageListener {
  private boolean acknowledge;
  private String consumerName;

  public AckMessageListener(boolean acknowledge, String consumerName) {
    this.acknowledge = acknowledge;
    this.consumerName = consumerName;
  }

  public void onMessage(Message message) {
    boolean terminate = !acknowledge;
    try {

      System.out.println("ConsumerName="+consumerName+", Acknowledge="+acknowledge);
      if (acknowledge) {
        try {
          message.acknowledge();
        } catch (JMSException e1) {
          e1.printStackTrace();
        }
      }

      System.out.println(message);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      if (terminate) {
        Thread.currentThread().interrupt();
      }
    }
  }
}

I want to simulate in a way that consumer1 listen to message but does not acknowledge so that it timeout i am trying to release the thread, I am expecting my consumer2 to pick it up and give acknowledgement to the message so that message get moved from "Messages Enqueued" state to "Messages Dequeued" state in broker but my consumer2 is not able to receive any message events.

Is there something i am doing wrong. How do I achieve consumer level timeout with Active MQ?

Endocentric answered 3/10, 2018 at 2:29 Comment(1)
This should be a good read for you pmichaels.net/2016/10/13/acknowledging-a-message-in-active-mqNorthwester
K
1

One way of handling this would be to use transactions (http://activemq.apache.org/how-do-transactions-work.html). You call commit() on success, rollback() on failure or if session is closed before calling the commit() then re delivery will occur (http://activemq.apache.org/message-redelivery-and-dlq-handling.html).

Kiri answered 3/10, 2018 at 20:31 Comment(2)
Even with transacted session and doing rollback still same consumer is picking the message. Is there any other configuration that I need to set?Endocentric
Ok I misunderstood the question. I thought you wanted to preserve the message in case if your consumer 1 dies while processing a message, and then you want consumer 2 to pick that up. But it seems you want to rollback from one consumer and want second consumer to pick up the message. I don't know if there is an ActiveMQ (neat) way of doing this. ActiveMQ is reattempting delivery (in round robin fashion). I am not sure what is your end goal but would sending a copy of same message to both consumers work for you? If yes, you can use composite destinations for that.Kiri

© 2022 - 2024 — McMap. All rights reserved.