I'm building a Spring Boot application that consumes an AWS SQS queue. I'm able to consume the queue, however I can't seem to do so with multiple concurrent consumers.
When running from the command line mvn spring-boot:run
, it connects to the SQS and will receive the messages one at a time. The listener method has additional code which causes a several second delay, during which I would expect the second message to be received and processed.
It's as if the configuration isn't being used.
pom.xml:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Finchley.SR1</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-aws</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws-messaging</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.11.419</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws</artifactId>
<version>2.0.0.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
main class:
@SpringBootApplication
public class IngestConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(IngestConsumerApplication.class, args);
}
}
Configuration class:
@Configuration
public class AppConfig {
@Value("${aws.region}")
private String region;
@Bean
@Primary
public AWSCredentialsProvider buildAWSCredentialsProvider() {
return new ProfileCredentialsProvider();
}
@Bean
@Primary
public AmazonS3 buildS3Client(@Autowired AWSCredentialsProvider credentials) {
return AmazonS3ClientBuilder
.standard()
.withCredentials(credentials)
.withRegion(region)
.build();
}
@Bean
@Primary
public AmazonSQSAsync amazonSQSAsync(@Autowired AWSCredentialsProvider credentials) {
return AmazonSQSAsyncClientBuilder.standard()
.withCredentials(credentials)
.withRegion(region)
.build();
}
@Bean
public QueueMessageHandler queueMessageHandler(@Autowired AmazonSQSAsync sqsClient, @Autowired QueueMessageHandlerFactory queueMessageHandlerFactory) {
queueMessageHandlerFactory.setAmazonSqs(sqsClient);
QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
return queueMessageHandler;
}
@Bean
public ObjectMapper buildJacksonObjectMapper() {
return new ObjectMapper();
}
@Bean
public QueueMessageHandlerFactory queueMessageHandlerFactory() {
QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
//set strict content type match to false
messageConverter.setStrictContentTypeMatch(false);
factory.setArgumentResolvers(Collections.<HandlerMethodArgumentResolver>singletonList(new PayloadArgumentResolver(messageConverter)));
return factory;
}
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(@Autowired AmazonSQSAsync amazonSQSAsync,
@Autowired QueueMessageHandler queueMessageHandler,
@Autowired ThreadPoolTaskExecutor threadPoolExecutor) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
simpleMessageListenerContainer.setMaxNumberOfMessages(2);
simpleMessageListenerContainer.setTaskExecutor(threadPoolExecutor);
return simpleMessageListenerContainer;
}
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setThreadNamePrefix("queueExec");
executor.initialize();
return executor;
}
}
An finally, the consumer:
@Async
@SqsListener(deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS, value = "${queue}")
public void listener(S3EventNotification event) {
...
}