I'm having a problem with ActiveMQ which I find very difficult to pinpoint, even to reproduce, it's difficult to ask a specific question. Please bear with me.
Basically I have:
producer (prioritized messages) -> queue -> consumer
Normally, there are a few 100k messages in the queue and whenever messages with higher priority arrive, they are consumed first.
This works fine until the stars align and high priority messages written into the queue do not get consumed. At least until I call Queue.removeMatchingMessages(String selector)
to remove messages from the queue - which and how many doesn't matter.
Luckily, I found a strong indicator of what's going on.
As can be seen in our UI, I submitted 444 messages with higher priority (3) than the rest (1) but they're not getting consumed:
Inspecting the Queue with the debugger, I found that StoreQueueCursor.pendingCount
is 444:
If I submit another 72 messages, the pending count is then 516 (444 + 72):
When I then remove the 72 messages using Queue.removeMatchingMessages(String selector)
, StoreQueueCursor.pendingCount
becomes 0:
And my 444 messages are suddenly being consumed:
So the best question I'm able to ask at this point is:
What is the purpose of StoreQueueCursor
and how does it cause my my messages from being consumed? Or rather: why aren't those messages written into the queue and ready to be consumed?
Any help is much appreciated.
I'm using org.apache.activemq:activemq-broker:5.15.12
(via Spring Boot 2.3.1.RELEASE).
Update
Interestingly enough, in the "happy case" where all my high prio messages get processed as they should, the pendingCount
is much higher than 0:
Update #2
In ActiveMQ's How can I support priority queues? it says:
Since the message cursors (and client side) implement strict ordering of priorities, it’s possible to observe strict priority ordering if message dispatching can happen from the cache and not have to hit the disk (i.e., your consumers are fast enough to keep up with producers), or if you’re using non-persistent messages that never have to flush to disk (using the FilePendingMessageCursor). However, once you hit a situation where consumers are slow, or producers are just significantly faster, you’ll observe that the cache will fill up (possibly with lower priority messages) while higher priority messages get stuck on disk and not available until they’re paged in. In this case, you can make a decision to tradeoff optimized message dispatching for priority enforcement. You can disable the cache, message expiration check, and lower you consumer prefetch to 1 to ensure getting the high priority messages from the store ahead of lower priority messages
So I tried disabling the cache like so (By the way, I already had jms.prefetchPolicy.all=0
set):
PolicyEntry policyEntry = new PolicyEntry();
policyEntry.setQueue(JmsQueueNames.TASK_QUEUE_PREFIX + ".*");
policyEntry.setDeadLetterStrategy(taskDeadLetterStrategy);
policyEntry.setPrioritizedMessages(true);
policyEntry.setUseCache(false);
policyEntry.setExpireMessagesPeriod(0);
Now, useCache
is false but cacheEnabled
is true:
But the same behavior can be observed.
Also, I always had Broker-Persistence turned off, so I'm not sure if the case described above even applies:
@Bean
public BrokerService broker(ActiveMQProperties properties, DispatcherProperties dispatcherProperties) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.getProducerSystemUsage().getMemoryUsage().setLimit(dispatcherProperties.getActiveMq().getMemoryLimit());
brokerService.addConnector(properties.getBrokerUrl());
brokerService.setPlugins(getPluginsToLoad());
brokerService.setDestinationPolicy(policyMap());
return brokerService;
}
Information from JMX: