DefaultMessageListenerContainer not scaling
Asked Answered
K

1

7

I have a DefaultMessageListenerContainer, which is (in my opinion) not scaling up. The Container is defined to listen on a queue, where 100 messages are located in.

I would expect, that the Container is going to any lengths, that the messages would be consumed as fast as it is possible (by observing the maxConcurrentConsumers configuration). So i would assume, that there are 7 concurrentConsumers. (beginning by 2 concurrentConsumers at container-startup) Some logging-information:

activeConsumerCount: 5
concurrentConsumers: 2
scheduledConsumerCount: 5
idleConsumerLimit: 1
idleTaskExecLimit: 1
maxConcurrentConsumers: 7

My Spring-config (a part of it):

<bean id="abstractMessageListenerContainer" class="my.package.structure.LoggingListenerContainer" abstract="true">
    <property name="connectionFactory" ref="jmscfCee" />
    <property name="maxConcurrentConsumers" value="7"/>
    <property name="receiveTimeout" value="100000" />
    <property name="concurrentConsumers" value="2" />
</bean>

<bean class="my.package.structure.LoggingListenerContainer" parent="abstractMessageListenerContainer">
    <property name="destinationName" value="MY.QUEUE" />
    <property name="messageListener" ref="myMessageListener" />
</bean>

<bean id="myMessageListener" class="my.package.structure.ListenerClass"></bean>

My Logging Container

public class LoggingListenerContainer extends DefaultMessageListenerContainer{

private static final Logger logger = Logger
        .getLogger(LoggingListenerContainer.class);
@Override
protected void doInvokeListener(MessageListener listener, Message message)
        throws JMSException {

    logger.info("activeConsumerCount: " + this.getActiveConsumerCount());
    logger.info("concurrentConsumers: " +  this.getConcurrentConsumers());
    logger.info("scheduledConsumerCount: " + this.getScheduledConsumerCount());
    logger.info("idleConsumerLimit: " + this.getIdleConsumerLimit());
    logger.info("idleTaskExecLimit: " + this.getIdleTaskExecutionLimit());
    logger.info("maxConcurrentConsumers: " + this.getMaxConcurrentConsumers());
    super.doInvokeListener(listener, message);
}

My Listener Class:

public class ListenerClass implements MessageListener {


    public void onMessage(Message msg) {
           //Do some business function
    }

}

Could someone be so kind to correct my configuration or give me some tipps concerning my configuration or explain me the approach of the Container? (if i had misunderstood something)

I'm locally testing with ActiveMQ (in Production with WebSphere MQ) - if it's relevant for scalability topics.

EDIT:

<bean id="jmscfCee" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
        <property name="brokerURL">
            <value>${jmscfCee.hostName}</value>
        </property>
</bean>

<bean id="jmscfCeeCachingConnectionFactory"
    class="org.springframework.jms.connection.CachingConnectionFactory ">
    <constructor-arg ref="jmscfCee" />
    <property name="sessionCacheSize" value="10" />
</bean>
Knotweed answered 25/9, 2012 at 12:41 Comment(2)
Which version of spring-jms are you using? Have you tried setting a custom taskExecutor in your container? By default, DefaultMessageListenerContainer seems to use a SimpleAsyncTaskExecutor, which should just spawn new threads for each task (as of 3.1.2), but I wonder if an older version might do something different. Also, are you examining this log output when the queue still contains many messages? If idleTaskExecutionLimit is low, then the container would probably kill threads it has spawned once they are no longer needed - experiment with a higher value here.Panzer
my spring-jms version is 3.1.1.RELEASE. Tried to set a custom taskexecutor - without any affect. The behaviour is also the same if there are still about 2k or 3k messages on the queue. Set idleTaskExecutionLimit to 10 didn't change anything as well.Knotweed
H
5

It depends. I had a similar issue with ActiveMQ a few years back, whereby its default behaviour is heavily opimized towards high volumes (many thousands) of small messages. By default each consumer will pre-fetch messages in batches of 1000, so if you have small numbers of messages you'll probably find they have all ended up in the pre-fetch buffer of one consumer, leaving the other consumers idle.

You can tune this behaviour using a prefetch policy, either on the connection URI or in the Spring configuration if that's how you're building your connection factory.

<amq:connectionFactory id="connectionFactory" brokerURL="vm://localhost">
  <property name="prefetchPolicy">
    <amq:prefetchPolicy all="1" />
  </property>
</amq:connectionFactory>

The version of ActiveMQ I was using at the time didn't support a prefetch limit of 0 (i.e. don't prefetch, just go to the broker every time) but the documentation suggests that this is now allowed.

Hollinger answered 25/9, 2012 at 12:52 Comment(3)
If the OP is using Spring's DefaultMessageListenerContainer, then that uses plain JMS API's and does not use any anything specific to ActiveMQ - correct? I don't think this advice would apply then.Panzer
True, but ultimately you need to configure the queue name and session factory somewhere, and that will be the point where you can add broker-specific extensions. For ActiveMQ you can set the prefetch policy per destination using a queue name like MY.QUEUE?consumer.prefetchSize=1Hollinger
prefetchSize didn't change anyhting on the behaviour. added it to my brokerurl and double-checked the property in my connectionfactory. post my connectionfactory-configuration as well (in my edit), maybe this is the bottleneck...Knotweed

© 2022 - 2024 — McMap. All rights reserved.