ActiveMQ: Why do my messages get stuck in StoreQueueCursor?
Asked Answered
B

0

6

I'm having a problem with ActiveMQ which I find very difficult to pinpoint, even to reproduce, it's difficult to ask a specific question. Please bear with me.

Basically I have:

producer (prioritized messages) -> queue -> consumer

Normally, there are a few 100k messages in the queue and whenever messages with higher priority arrive, they are consumed first.

This works fine until the stars align and high priority messages written into the queue do not get consumed. At least until I call Queue.removeMatchingMessages(String selector) to remove messages from the queue - which and how many doesn't matter.

Luckily, I found a strong indicator of what's going on.

As can be seen in our UI, I submitted 444 messages with higher priority (3) than the rest (1) but they're not getting consumed:

enter image description here

Inspecting the Queue with the debugger, I found that StoreQueueCursor.pendingCount is 444:

enter image description here

If I submit another 72 messages, the pending count is then 516 (444 + 72):

enter image description here

When I then remove the 72 messages using Queue.removeMatchingMessages(String selector), StoreQueueCursor.pendingCount becomes 0:

enter image description here

And my 444 messages are suddenly being consumed:

enter image description here

So the best question I'm able to ask at this point is:

What is the purpose of StoreQueueCursor and how does it cause my my messages from being consumed? Or rather: why aren't those messages written into the queue and ready to be consumed?

Any help is much appreciated.

I'm using org.apache.activemq:activemq-broker:5.15.12 (via Spring Boot 2.3.1.RELEASE).

Update

Interestingly enough, in the "happy case" where all my high prio messages get processed as they should, the pendingCount is much higher than 0:

enter image description here

Update #2

In ActiveMQ's How can I support priority queues? it says:

Since the message cursors (and client side) implement strict ordering of priorities, it’s possible to observe strict priority ordering if message dispatching can happen from the cache and not have to hit the disk (i.e., your consumers are fast enough to keep up with producers), or if you’re using non-persistent messages that never have to flush to disk (using the FilePendingMessageCursor). However, once you hit a situation where consumers are slow, or producers are just significantly faster, you’ll observe that the cache will fill up (possibly with lower priority messages) while higher priority messages get stuck on disk and not available until they’re paged in. In this case, you can make a decision to tradeoff optimized message dispatching for priority enforcement. You can disable the cache, message expiration check, and lower you consumer prefetch to 1 to ensure getting the high priority messages from the store ahead of lower priority messages

So I tried disabling the cache like so (By the way, I already had jms.prefetchPolicy.all=0 set):

PolicyEntry policyEntry = new PolicyEntry();
policyEntry.setQueue(JmsQueueNames.TASK_QUEUE_PREFIX + ".*");
policyEntry.setDeadLetterStrategy(taskDeadLetterStrategy);
policyEntry.setPrioritizedMessages(true);
policyEntry.setUseCache(false);
policyEntry.setExpireMessagesPeriod(0);

Now, useCache is false but cacheEnabled is true:

enter image description here

But the same behavior can be observed.

Also, I always had Broker-Persistence turned off, so I'm not sure if the case described above even applies:

@Bean
public BrokerService broker(ActiveMQProperties properties, DispatcherProperties dispatcherProperties) throws Exception {
  BrokerService brokerService = new BrokerService();
  brokerService.setPersistent(false);
  brokerService.getProducerSystemUsage().getMemoryUsage().setLimit(dispatcherProperties.getActiveMq().getMemoryLimit());
  brokerService.addConnector(properties.getBrokerUrl());
  brokerService.setPlugins(getPluginsToLoad());
  brokerService.setDestinationPolicy(policyMap());
  return brokerService;
}

Information from JMX:

enter image description here

Bang answered 19/8, 2020 at 9:8 Comment(3)
Have you tried creating multiples queues based on priority.Hoboken
Have you tried this on ActiveMQ Artemis?Prosaism
I haven't tried one queue per priority, I think this would increase complexity a lot as consumers had to read the queues in right order. Not sure if I could achieve this with composite queues? I haven't tried Artemis either but I might if I can't find a solution.Bang

© 2022 - 2024 — McMap. All rights reserved.