Slow message consumption using AmazonSQSClient
Asked Answered
F

1

7

So, i used concurrency in spring jms 50-100, allowing max connections upto 200. Everything is working as expected but if i try to retrieve 100k messages from queue, i mean there are 100k messages on my sqs and i reading them through the spring jms normal approach.

@JmsListener
Public void process (String message) {
count++;
Println (count);
//code
 }

I am seeing all the logs in my console but after around 17k it starts throwing exceptions

Something like : aws sdk exception : port already in use.

Why do i see this exception and how do. I get rid of it?

I tried looking on the internet for it. Couldn't find anything.

My setting :

Concurrency 50-100

Set messages per task :50

Client acknowledged

timestamp=10:27:57.183, level=WARN , logger=c.a.s.j.SQSMessageConsumerPrefetch, message={ConsumerPrefetchThread-30} Encountered exception during receive in ConsumerPrefetch thread,
javax.jms.JMSException: AmazonClientException: receiveMessage.
    at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.handleException(AmazonSQSMessagingClientWrapper.java:422)
    at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.receiveMessage(AmazonSQSMessagingClientWrapper.java:339)
    at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.getMessages(SQSMessageConsumerPrefetch.java:248)
    at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.run(SQSMessageConsumerPrefetch.java:207)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP request: Address already in use: connect

Update : i looked for the problem and it seems that new sockets are being created until every sockets gets exhausted.

My spring jms version would be 4.3.10

To replicate this problem just do the above configuration with the max connection as 200 and currency set to 50-100 and push some 40k messages to the sqs queue.. One can use https://github.com/adamw/elasticmq this as a local stack server which replicates Amazon sqs.. After being done till here. Comment jms listener and use soap ui load testing and call the send message to fire many messages. Just because you commented @jmslistener annotation, it won't consume messages from queue. Once you see that you have sent 40k messages, stop. Uncomment @jmslistener and restart the server.

Update :

DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setDestinationResolver(new DynamicDestinationResolver());
        factory.setErrorHandler(Throwable::printStackTrace);
        factory.setConcurrency("50-100");
        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        return factory;

Update :

SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), amazonSQSclient);

Update :

Client configuration details :

Protocol : HTTP
Max connections : 200

Update :

I used cache connection factory class and it seems. I read on stack overflow and in their official documentation to not use cache connection factory class and default jms listener container factory.

https://mcmap.net/q/812969/-why-defaultmessagelistenercontainer-should-not-use-cachingconnectionfactory

It's gives the same error that i got before though.

update

My goal is to get a 500 tps, i.e i should be able to consume that much.. So i tried this method and it seems I can reach 100-200, but not more than that.. Plus this thing is a blocker at high concurrency .. If you use it.. If you have some better solution to achieve it.. I am all ears.

**updated **

I am using amazonsqsclient

Faveolate answered 11/12, 2018 at 15:39 Comment(13)
You can include the stack trace, just redact out anything sensitiveCymograph
@Cymograph Please seeFaveolate
This answer seems like it would apply to your situation - you might be able to tweak your AWS ClientConfiguration settings for the sqs client to workaround this, but I’m not sure off the top of my head which ones (I’ll turn this into a proper answer if I figure that out). Alternatively, you can distribute your requests across multiple hosts.Cymograph
i am setting proxy port in the client configuration as well. Could it be the reason? Even i saw this answer, could think of applying that answer to this situationFaveolate
i found the problem but dont know how to apply it on jms , actually it is creating sockets internally one after another until it hits its max peak of ports and tries to reuse the socket. Since all of them are in time wait state , it throws this exception. I dont know how to close those sockets.Faveolate
Could you provide the configuration for your JmsListenerContainerFactory?Alternative
Please see. @deanFaveolate
I assume the connectionFactory is a SQSConnectionFactory can we see the configuration for this as well.Alternative
Yes. Uodsted... DeanFaveolate
One more request: could you show how you create the AmazonSQSClient, specifically the client configuration.Alternative
Update...... DeanFaveolate
I am creating through builder.Faveolate
Let us continue this discussion in chat.Alternative
S
5

Starvation on the Consumer

One possible optimization that JMS clients tend to implement, is a message consumption buffer or "prefetch". This buffer is sometimes tunable via the number of messages or by a buffer size in bytes.

The intention is to prevent the consumer from going to the server every single time it receives a messages, rather than pulling multiple messages in a batch.

In an environment where you have many "fast consumers" (which is the opinionated view these libraries may take), this prefetch is set to a somewhat high default in order to minimize these round trips.

However, in an environment with slow message consumers, this prefetch can be a problem. The slow consumer is holding up messaging consumption for those prefetched messages from the faster consumer. In a highly concurrent environment, this can cause starvation quickly.

That being the case the SQSConnectionFactory has a property for this:

SQSConnectionFactory sqsConnectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), amazonSQSclient);
sqsConnectionFactory.setNumberOfMessagesToPrefetch(0);

Starvation on the Producer (i.e. via JmsTemplate)

It's very common for these JMS implementations to expect be interfaced to the broker via some intermediary. These intermediaries actually cache and reuse connections or use a pooling mechanism to reuse them. In the Java EE world, this is usually taken care of a JCA adapter or other method on a Java EE server.

Because of the way Spring JMS works, it expects an intermediary delegate for the ConnectionFactory to exist to do this caching/pooling. Otherwise, when Spring JMS wants to connect to the broker, it will attempt to open a new connection and session (!) every time you want to do something with the broker.

To solve this, Spring provides a few options. The simplest being the CachingConnectionFactory, which caches a single Connection, and allows many Sessions to be opened on that Connection. A simple way to add this to your @Configuration above would be something like:

@Bean
public ConnectionFactory connectionFactory(AmazonSQSClient amazonSQSclient) {

    SQSConnectionFactory sqsConnectionFactory = new SQSConnectionFactory(new ProviderConfiguration(), amazonSQSclient);

    // Doing the following is key!
    CachingConnectionFactory connectionfactory = new CachingConnectionFactory();
    connectionfactory.setTargetConnectionFactory(sqsConnectionFactory);
    // Set the #connectionfactory properties to your liking here...

    return connectionFactory;

}

If you want something more fancy as a JMS pooling solution (which will pool Connections and MessageProducers for you in addition to multiple Sessions), you can use the reasonably new PooledJMS project's JmsPoolConnectionFactory, or the like, from their library.

Snoddy answered 21/12, 2018 at 5:1 Comment(8)
anything else that you might want me to add and tweak?Faveolate
That's all I'm hoping you need to fix your connection starvation issue (which seems to be the last and main issue you were seeing)Snoddy
new scoket is being created on the go, i mean same thingFaveolate
Didn't fully realize it was all on the consumer side. Just updated the answer for youSnoddy
Yes starvation is on the consumer side. Producer seems to be working perfectly fine.. Plus isn't new ProviderConfiguration() has a by default value set to zero?..Faveolate
There's also a maxBatch setting docs.aws.amazon.com/AWSSimpleQueueService/latest/… on the Async client. Which are you using?Snoddy
See my question, provider configuration, i haven't passed any value. :(, so default one.Faveolate
If you're going to go with the default SQS client, Figure out how fast your slowest consumer consuming messages. I would time it and see how long it's taking. It will help your other consumers. Otherwise, look into using the Async consumer but consider the tradeoffs. Also, for SO documentation purposes, conglomerate or delete some of the above comments so it will be easier for readers to get up to speed on this issueSnoddy

© 2022 - 2024 — McMap. All rights reserved.