I trying to create a consumer level timeout in Active MQ (version 5.15.0). Consider one message is picked by a consumer but not able to acknowledge so in that case i want consumer to timeout so that same message can be picked by other consumer listening to broker.
My Producer code where I am setting two consumer listener:
public class JmsMessageListenerAckExample {
public static void main(String[] args) throws URISyntaxException, Exception {
Connection connection = null;
try {
// Producer
ConnectionFactory factory = createActiveMQConnectionFactory();
connection = factory.createConnection();
Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("customerQueue");
String payload = "Important Task";
Message msg = session.createTextMessage(payload);
MessageProducer producer = session.createProducer(queue);
System.out.println("Sending text '" + payload + "'");
producer.send(msg);
// Consumer
MessageConsumer consumer1 = session.createConsumer(queue);
consumer1.setMessageListener(
new AckMessageListener(false, "consumer1"));
Thread.sleep(1000);
System.out.println("Creating new message listener to acknowledge");
producer.send(msg);
MessageConsumer consumer2 = session.createConsumer(queue);
consumer2.setMessageListener(
new AckMessageListener(true, "consumer2"));
connection.start();
Thread.sleep(3000);
session.close();
} finally {
if (connection != null) {
connection.close();
}
}
}
private static ActiveMQConnectionFactory createActiveMQConnectionFactory() {
// Create a connection factory.
final ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("tcp://localhost:61616");
// Pass the username and password.
connectionFactory.setUserName("user");
connectionFactory.setPassword("user");
return connectionFactory;
}
}
This is my consumer listener:
public class AckMessageListener implements MessageListener {
private boolean acknowledge;
private String consumerName;
public AckMessageListener(boolean acknowledge, String consumerName) {
this.acknowledge = acknowledge;
this.consumerName = consumerName;
}
public void onMessage(Message message) {
boolean terminate = !acknowledge;
try {
System.out.println("ConsumerName="+consumerName+", Acknowledge="+acknowledge);
if (acknowledge) {
try {
message.acknowledge();
} catch (JMSException e1) {
e1.printStackTrace();
}
}
System.out.println(message);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (terminate) {
Thread.currentThread().interrupt();
}
}
}
}
I want to simulate in a way that consumer1
listen to message but does not acknowledge so that it timeout i am trying to release the thread, I am expecting my consumer2
to pick it up and give acknowledgement to the message so that message get moved from "Messages Enqueued" state to "Messages Dequeued" state in broker but my consumer2
is not able to receive any message events.
Is there something i am doing wrong. How do I achieve consumer level timeout with Active MQ?