I have a DefaultMessageListenerContainer, which is (in my opinion) not scaling up. The Container is defined to listen on a queue, where 100 messages are located in.
I would expect, that the Container is going to any lengths, that the messages would be consumed as fast as it is possible (by observing the maxConcurrentConsumers configuration). So i would assume, that there are 7 concurrentConsumers. (beginning by 2 concurrentConsumers at container-startup) Some logging-information:
activeConsumerCount: 5
concurrentConsumers: 2
scheduledConsumerCount: 5
idleConsumerLimit: 1
idleTaskExecLimit: 1
maxConcurrentConsumers: 7
My Spring-config (a part of it):
<bean id="abstractMessageListenerContainer" class="my.package.structure.LoggingListenerContainer" abstract="true">
<property name="connectionFactory" ref="jmscfCee" />
<property name="maxConcurrentConsumers" value="7"/>
<property name="receiveTimeout" value="100000" />
<property name="concurrentConsumers" value="2" />
</bean>
<bean class="my.package.structure.LoggingListenerContainer" parent="abstractMessageListenerContainer">
<property name="destinationName" value="MY.QUEUE" />
<property name="messageListener" ref="myMessageListener" />
</bean>
<bean id="myMessageListener" class="my.package.structure.ListenerClass"></bean>
My Logging Container
public class LoggingListenerContainer extends DefaultMessageListenerContainer{
private static final Logger logger = Logger
.getLogger(LoggingListenerContainer.class);
@Override
protected void doInvokeListener(MessageListener listener, Message message)
throws JMSException {
logger.info("activeConsumerCount: " + this.getActiveConsumerCount());
logger.info("concurrentConsumers: " + this.getConcurrentConsumers());
logger.info("scheduledConsumerCount: " + this.getScheduledConsumerCount());
logger.info("idleConsumerLimit: " + this.getIdleConsumerLimit());
logger.info("idleTaskExecLimit: " + this.getIdleTaskExecutionLimit());
logger.info("maxConcurrentConsumers: " + this.getMaxConcurrentConsumers());
super.doInvokeListener(listener, message);
}
My Listener Class:
public class ListenerClass implements MessageListener {
public void onMessage(Message msg) {
//Do some business function
}
}
Could someone be so kind to correct my configuration or give me some tipps concerning my configuration or explain me the approach of the Container? (if i had misunderstood something)
I'm locally testing with ActiveMQ (in Production with WebSphere MQ) - if it's relevant for scalability topics.
EDIT:
<bean id="jmscfCee" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>${jmscfCee.hostName}</value>
</property>
</bean>
<bean id="jmscfCeeCachingConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory ">
<constructor-arg ref="jmscfCee" />
<property name="sessionCacheSize" value="10" />
</bean>
taskExecutor
in your container? By default, DefaultMessageListenerContainer seems to use a SimpleAsyncTaskExecutor, which should just spawn new threads for each task (as of 3.1.2), but I wonder if an older version might do something different. Also, are you examining this log output when the queue still contains many messages? IfidleTaskExecutionLimit
is low, then the container would probably kill threads it has spawned once they are no longer needed - experiment with a higher value here. – Panzer