Spring JMSTemplate receive all messages in one transaction
Asked Answered
M

2

9

I am trying to get all messages from the queue in synchronous mode using Spring JMSTemplate.receive(String) method.

The problem is that I get always only one message. Here is the code:

@Transactional
public List<Message> receiveAllFromQueue(String destination) {
  List<Message> messages = new ArrayList<Message>();
  Message message;
  while ((message = queueJmsTemplate.receive(destination)) != null) {
    messages.add(message);
  }
  return messages;
}

If I remove @Transactional annotation I get all messages, but all is done out of transaction so if later during processing these messages there is an exception the messages will be lost.

Here is a definition of my JMSTemplate bean.

<bean id="queueJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="pubSubDomain" value="false" />
    <property name="receiveTimeout" value="1" />
   <property name="sessionTransacted" value="true" />
</bean>

What I want to achieve is to have one transaction and within this transaction I want to get all pending messages.

Mathieu answered 30/7, 2013 at 11:8 Comment(0)
M
5

I will reply to myself. It looks like JMSTemplate doesn't support it. The only way to solve it temporarily is to extend JMSTemplate and add new method which uses parts of JMSTemplate. Unfortunately some methods are private so they need to be copied...

public class CustomQueueJmsTemplate extends JmsTemplateDelegate {

  public List<Message> receiveAll(String destinationName) {
    return receiveAll(destinationName, null);
  }

  public List<Message> receiveAll(final String destinationName, final String messageSelector) {
    return execute(new SessionCallback<List<Message>>() {
      @Override
      public List<Message> doInJms(Session session) throws JMSException {
        Destination destination = resolveDestinationName(session, destinationName);
        return doReceiveAll(session, destination, messageSelector);
      }
    }, true);
  }

  private List<Message> doReceiveAll(Session session, Destination destination, String messageSelector)
      throws JMSException
  {
    return doReceiveAll(session, createConsumer(session, destination, messageSelector));
  }

  private List<Message> doReceiveAll(Session session, MessageConsumer consumer) throws JMSException {
    try {
      // Use transaction timeout (if available).
      long timeout = getReceiveTimeout();
      JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager
          .getResource(getConnectionFactory());
      if (resourceHolder != null && resourceHolder.hasTimeout()) {
        timeout = resourceHolder.getTimeToLiveInMillis();
      }

      // START OF MODIFIED CODE
      List<Message> messages = new ArrayList<>();
      Message message;
      while ((message = doReceive(consumer, timeout)) != null) {
        messages.add(message);
      }
      // END OF MODIFIED CODE

      if (session.getTransacted()) {
        // Commit necessary - but avoid commit call within a JTA transaction.
        if (isSessionLocallyTransacted(session)) {
          // Transacted session created by this template -> commit.
          JmsUtils.commitIfNecessary(session);
        }
      } else if (isClientAcknowledge(session)) {
        // Manually acknowledge message, if any.
        for (Message retrievedMessages : messages) {
          retrievedMessages.acknowledge();
        }
      }
      return messages;
    }
    finally {
      JmsUtils.closeMessageConsumer(consumer);
    }
  }

  private Message doReceive(MessageConsumer consumer, long timeout) throws JMSException {
    if (timeout == RECEIVE_TIMEOUT_NO_WAIT) {
      return consumer.receiveNoWait();
    } else if (timeout > 0) {
      return consumer.receive(timeout);
    } else {
      return consumer.receive();
    }
  }

}
Mathieu answered 26/3, 2014 at 13:44 Comment(0)
O
8

The receive method of the JmsTemplate creates a new MessageConsumer every time. The second time, your transaction is not yet committed, and Spring will have prefetched a number of messages during the first receive. At that moment are no messages left to fetch, resulting in null from your receive call.

JmsTemplate in Spring has an execute method that takes a SessionCallback as parameter. This allows you to run your own code against the underlying session of the JmsTemplate. Creating only one MessageConsumer should fix your problem.

@Transactional
public List<Message> receiveAllFromQueue(String destination) {
    return jmsTemplate.execute(session -> {
        try (final MessageConsumer consumer = session.createConsumer(session.createQueue(destination))) {
            List<Message> messages = new ArrayList<>();
            Message message;
            while ((message = consumer.receiveNoWait()) != null) {
                messages.add(message);
            }
            return messages;
        }
    }, true);
}
Oneiromancy answered 27/9, 2018 at 13:56 Comment(0)
M
5

I will reply to myself. It looks like JMSTemplate doesn't support it. The only way to solve it temporarily is to extend JMSTemplate and add new method which uses parts of JMSTemplate. Unfortunately some methods are private so they need to be copied...

public class CustomQueueJmsTemplate extends JmsTemplateDelegate {

  public List<Message> receiveAll(String destinationName) {
    return receiveAll(destinationName, null);
  }

  public List<Message> receiveAll(final String destinationName, final String messageSelector) {
    return execute(new SessionCallback<List<Message>>() {
      @Override
      public List<Message> doInJms(Session session) throws JMSException {
        Destination destination = resolveDestinationName(session, destinationName);
        return doReceiveAll(session, destination, messageSelector);
      }
    }, true);
  }

  private List<Message> doReceiveAll(Session session, Destination destination, String messageSelector)
      throws JMSException
  {
    return doReceiveAll(session, createConsumer(session, destination, messageSelector));
  }

  private List<Message> doReceiveAll(Session session, MessageConsumer consumer) throws JMSException {
    try {
      // Use transaction timeout (if available).
      long timeout = getReceiveTimeout();
      JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager
          .getResource(getConnectionFactory());
      if (resourceHolder != null && resourceHolder.hasTimeout()) {
        timeout = resourceHolder.getTimeToLiveInMillis();
      }

      // START OF MODIFIED CODE
      List<Message> messages = new ArrayList<>();
      Message message;
      while ((message = doReceive(consumer, timeout)) != null) {
        messages.add(message);
      }
      // END OF MODIFIED CODE

      if (session.getTransacted()) {
        // Commit necessary - but avoid commit call within a JTA transaction.
        if (isSessionLocallyTransacted(session)) {
          // Transacted session created by this template -> commit.
          JmsUtils.commitIfNecessary(session);
        }
      } else if (isClientAcknowledge(session)) {
        // Manually acknowledge message, if any.
        for (Message retrievedMessages : messages) {
          retrievedMessages.acknowledge();
        }
      }
      return messages;
    }
    finally {
      JmsUtils.closeMessageConsumer(consumer);
    }
  }

  private Message doReceive(MessageConsumer consumer, long timeout) throws JMSException {
    if (timeout == RECEIVE_TIMEOUT_NO_WAIT) {
      return consumer.receiveNoWait();
    } else if (timeout > 0) {
      return consumer.receive(timeout);
    } else {
      return consumer.receive();
    }
  }

}
Mathieu answered 26/3, 2014 at 13:44 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.