@SQSListener not using concurrency settings
Asked Answered
K

1

8

I'm building a Spring Boot application that consumes an AWS SQS queue. I'm able to consume the queue, however I can't seem to do so with multiple concurrent consumers.

When running from the command line mvn spring-boot:run, it connects to the SQS and will receive the messages one at a time. The listener method has additional code which causes a several second delay, during which I would expect the second message to be received and processed.

It's as if the configuration isn't being used.

pom.xml:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.5.RELEASE</version>
    <relativePath /> <!-- lookup parent from repository -->
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <spring-cloud.version>Finchley.SR1</spring-cloud.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-aws</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-aws-messaging</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk</artifactId>
        <version>1.11.419</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-aws</artifactId>
            <version>2.0.0.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

main class:

@SpringBootApplication
public class IngestConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(IngestConsumerApplication.class, args);
    }
}

Configuration class:

@Configuration
public class AppConfig {

@Value("${aws.region}")
private String region;

@Bean
@Primary
public AWSCredentialsProvider buildAWSCredentialsProvider() {
    return new ProfileCredentialsProvider();
}

@Bean
@Primary
public AmazonS3 buildS3Client(@Autowired AWSCredentialsProvider credentials) {
    return AmazonS3ClientBuilder
            .standard()
            .withCredentials(credentials)
            .withRegion(region)
            .build();
}

@Bean
@Primary
public AmazonSQSAsync amazonSQSAsync(@Autowired AWSCredentialsProvider credentials) {
    return AmazonSQSAsyncClientBuilder.standard()
            .withCredentials(credentials)
            .withRegion(region)
            .build();
}

@Bean
public QueueMessageHandler queueMessageHandler(@Autowired AmazonSQSAsync sqsClient, @Autowired QueueMessageHandlerFactory queueMessageHandlerFactory) {
    queueMessageHandlerFactory.setAmazonSqs(sqsClient);
    QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
    return queueMessageHandler;
}

@Bean
public ObjectMapper buildJacksonObjectMapper() {
    return new ObjectMapper();
}

@Bean
public QueueMessageHandlerFactory queueMessageHandlerFactory() {
    QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
    MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();

    //set strict content type match to false
    messageConverter.setStrictContentTypeMatch(false);
    factory.setArgumentResolvers(Collections.<HandlerMethodArgumentResolver>singletonList(new PayloadArgumentResolver(messageConverter)));
    return factory;
}

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(@Autowired AmazonSQSAsync amazonSQSAsync, 
        @Autowired QueueMessageHandler queueMessageHandler,
        @Autowired ThreadPoolTaskExecutor threadPoolExecutor) {
    SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
    simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
    simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
    simpleMessageListenerContainer.setMaxNumberOfMessages(2);
    simpleMessageListenerContainer.setTaskExecutor(threadPoolExecutor);

    return simpleMessageListenerContainer;
}

@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(2);
    executor.setThreadNamePrefix("queueExec");
    executor.initialize();
    return executor;
}

}

An finally, the consumer:

@Async
@SqsListener(deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS, value = "${queue}")
public void listener(S3EventNotification event) {
    ...
}
Kelvinkelwen answered 2/10, 2018 at 3:51 Comment(0)
I
0

Remove the @Async tag off the @SqsListner tagged method for starters. I do not believe that will give you anything as the second tag is evaluated at runtime by a different process and probably doesn't get the proxy object that facilitates the Async anyways.

Also "simpleMessageListenerContainer.setMaxNumberOfMessages(2);" is how many messages you can pull back in a single poll of SQS. The max is 10. But this isn't concurrency however it DOES give the process more messages to call more listener instances. So that value would better facilitate concurrency but not guarantee it.

Impenitent answered 13/2, 2022 at 19:18 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.