SpringBoot @SqsListener - not working - with Exception - TaskRejectedException
Asked Answered
F

6

16

I have a AWS SQS with 5000 messages already on the Queue (Sample Message looks like this 'Hello @ 1') I created a SpringBoot Application and inside one of the Component Classes create a method to read messages from the SQS.

package com.example.aws.sqs.service;

import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
public class MessageReceiverService {   

@SqsListener(value = { "${cloud.aws.sqs.url}" }, deletionPolicy = SqsMessageDeletionPolicy.ALWAYS)
public void readMessage(String message){
    log.info("Reading Message... {}", message);
}

}

My main SpringBoot Class

@SpringBootApplication 
public class AwsSqsApplicationConsumer {
public static void main(String[] args) {
    SpringApplication.run(AwsSqsApplicationConsumer.class, args);
}
}

Exception I get when the application runs:

s.c.a.m.l.SimpleMessageListenerContainer : An Exception occurred while polling queue '<my sqs name>'. The failing operation will be retried in 10000 milliseconds
org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@7c1594a5[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 20]] did not accept task: org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable@1cbd9ef2
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:309) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$AsynchronousMessageListener.run(SimpleMessageListenerContainer.java:286) ~[spring-cloud-aws-messaging-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_65]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_65]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65]
Caused by: java.util.concurrent.RejectedExecutionException: Task org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable@1cbd9ef2 rejected from java.util.concurrent.ThreadPoolExecutor@7c1594a5[Running, pool size = 3, active threads = 2, queued tasks = 0, completed tasks = 20]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) ~[na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) [na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) [na:1.8.0_65]
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:306) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
... 6 common frames omitted

I am NOT configuring any custom Executor services. Using the preconfigured Spring Beans. springBootVersion = '2.0.3.RELEASE' springCloudVersion = 'Finchley.RELEASE'

Fragrance answered 17/7, 2018 at 4:4 Comment(4)
The error seems to be SimpleMessageListenerContainer - where is this code?Lunt
org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer is a Spring Class, shipped with spring-cloud-aws-messaging-2.0.0.RELEASE.jarFragrance
Hi how did you solved this issueElzaelzevir
@AnkitaAgrawal- I have stopped using '@SqsListener' and started using AmazonSqs Client with '@Scheduled' with fixedRate system from Spring.Fragrance
D
17

setting the max number of messages seems to solve the issue:

@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQS){
    SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
    factory.setAmazonSqs(amazonSQS);
    factory.setMaxNumberOfMessages(10);
    return factory;
}
Divergence answered 12/10, 2018 at 9:42 Comment(2)
Adding some explanation about how this works: Spring-cloud-aws will set the number of threads to the number of messages if the number of messages has ben explicitly set, otherwise it will set it to 2 even if the default number of messages is 10Goofball
Really interested with this answer can you please post the full source code tring to implement similar mechanismZoom
B
5

I believe it is a bug or oversight in Spring. The issue stems from the default values of:

public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer {

    private static final int DEFAULT_WORKER_THREADS = 2;

and

abstract class AbstractMessageListenerContainer implements InitializingBean, DisposableBean, SmartLifecycle, BeanNameAware {
    private static final int DEFAULT_MAX_NUMBER_OF_MESSAGES = 10;

If no maxNumberOfMessages is set, then it uses 10 as the number of messages to pull from SQS and 2 as the number of workers in the task executor. This means if it pulls 3 or more messages at once, you get that exception. If you manually set maxNumberOfMessages to a value (any value) it will use it both places synchronising the values as i believe is expected:

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(
            SimpleMessageListenerContainerFactory factory, QueueMessageHandler messageHandler)
    {
        SimpleMessageListenerContainer container = factory.createSimpleMessageListenerContainer();
        container.setMaxNumberOfMessages(5);
        container.setMessageHandler(messageHandler);
        return container;
    }
Burnett answered 6/3, 2020 at 7:57 Comment(0)
P
3

Problem is with the listener thread configuration. See the following

...
ThreadPoolExecutor@7c1594a5[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 20]]
...

The default thread pool size is less than what you desire.

Add following configuration to your Spring Application

@Configuration
public class TasksConfiguration implements SchedulingConfigurer {

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(5); // TODO: Load this from configuration
        taskScheduler.initialize();
        taskRegistrar.setTaskScheduler(taskScheduler);
    }
}

Now, you should be able to process these tasks.

P.S. Whatever tasks were rejected earlier they will be picked up later after the certain period.

Edit: I think people are getting scared by the number in line .setPoolSize(5000). It's a configurable number you can choose whatever number suitable for your requirements. For the answer, I'm reducing it to a smaller number.

Partitive answered 30/7, 2018 at 7:9 Comment(2)
so as per your solution I need as many threads as the number of messages in the SQS ? 5000 is a big number to be configured as size and assuming that you factored the part where this could just run on a low core CPU.Fragrance
I think it's pretty clear that it's a configurable number. You've to chose a number according to your application requirement (Obviously I can't help choosing this number for you). Another thing, you don't have to set it as same as number of messages in SQS. It depends how much messages you want to process simmultaneously. If you want them to process simultaneously you can set it to 5000. If you don't mind processing them one by one you can set it to one. They will be processed later, as mentioned in the answer. But number of messages will continue to pile up. That's your decision.Partitive
E
2

Cannot add a comment to the previous answers to further explain why the issue is occurring and why the solution setting MaxNumberOfMessages of messages works. Hopefully, the following helps to clarify everything.

SimpleMessageListenerContainer's ThreadPoolTaskExecutor is configured to have a core pool size of 2 threads, a max pool size of 3 threads and a queue capacity of 0. However, the default max number of messages to return on a poll to Amazon SQS is set to 10. Meaning that should 10 messages be available in a single poll there won't be enough threads to process them. Thus the RejectedExecutionException is thrown.

Configuring the setMaxNumberOfMessages to 10 on SimpleMessageListenerContainerFactory sets the max thread pool size to 11, which should allow enough threads to be available. It doesn't set the queue capacity.

To set the queue capacity, a separate TaskExecutor can be initialised and set on the SimpleMessageListenerContainerFactory bean as follows:

@Bean(name = "sqsAsyncTaskExecutor")
public AsyncTaskExecutor asyncTaskExecutor(@Value("${external.aws.sqs.core-thread-count}") int coreThreadCount,
                                           @Value("${external.aws.sqs.max-thread-count}") int maxThreadCount,
                                           @Value("${external.aws.sqs.queue-capacity}") int queueCapacity) {
    ThreadPoolTaskExecutor asyncTaskExecutor = new ThreadPoolTaskExecutor();
    asyncTaskExecutor.setCorePoolSize(coreThreadCount);
    asyncTaskExecutor.setMaxPoolSize(maxThreadCount);
    asyncTaskExecutor.setQueueCapacity(queueCapacity);
    asyncTaskExecutor.setThreadNamePrefix("threadPoolExecutor-SimpleMessageListenerContainer-");
    asyncTaskExecutor.initialize();
    return asyncTaskExecutor;
}

@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQS, @Qualifier("sqsAsyncTaskExecutor") AsyncTaskExecutor asyncTaskExecutor) {
    SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory = new SimpleMessageListenerContainerFactory();
    simpleMessageListenerContainerFactory.setTaskExecutor(asyncTaskExecutor);
    return simpleMessageListenerContainerFactory;
}

The values I use were coreThreadCount = 5, maxThreadCount = 20, queueCapacity = 10.

As I've already said, I think configuring the setMaxNumberOfMessages to 10 on SimpleMessageListenerContainerFactory should be enough to process all batched messages fetched from a single request. However, if you feel you need more precise control over the TaskExecutor then this configuration works as well.

Ellmyer answered 24/11, 2019 at 13:58 Comment(0)
V
2

Adding @EnableSqs annotation on the MessageRecieverService

Vitriolize answered 19/7, 2021 at 19:38 Comment(1)
this resovled it for meGlean
A
1

Hey I solved this problem using Spring Listener. Following is the code, hope it helps.

In following solution, once all the beans initialization is finished, than a new task executor with bigger pool size is allocated.

@Component
public class PostBeansConstructionListener{

    @EventListener
    public void handleContextRefreshedEvent(ContextRefreshedEvent event){
        final ApplicationContext applicationContext = event.getApplicationContext();
        final SimpleMessageListenerContainer simpleMessageListenerContainer = applicationContext.getBean(SimpleMessageListenerContainer.class);
        setAsyncTaskExecutor(simpleMessageListenerContainer);
    }

    private void setAsyncTaskExecutor(SimpleMessageListenerContainer simpleMessageListenerContainer) {
        try{
            simpleMessageListenerContainer.setTaskExecutor(getAsyncExecutor());
        }catch(Exception ex){
            throw new RuntimeException("Not able to create Async Task Executor for SimpleMessageListenerContainer.", ex);
        }
    }

    public AsyncTaskExecutor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(7);
        executor.setMaxPoolSize(42);
        executor.setQueueCapacity(11);
        executor.setThreadNamePrefix("threadPoolExecutor-SimpleMessageListenerContainer-");
        executor.initialize();
        return executor;
    }
}
According answered 7/10, 2019 at 9:9 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.