SQS Message visibility timeout being set to 0 when exception is thrown and @JMSListener
Asked Answered
N

1

11

I have a simple Spring Boot service that listens to an AWS SQS queue using JMSTemplate. Everything works as expected when the message is properly handled.

I am using CLIENT_ACKNOWLEDGE so when an exception is thrown during processing, the message is received again. However the Default Visibility Timeout setting on the SQS queue is being ignored and the message is being received again immediately.

The SQS queue is configured with a 30 second Default Visibility Timeout and a re-drive policy of 20 receives before putting the message on a DLQ.

I have disabled the service and used the SQS Console to verify that the Default Visibility Timeout is properly set. I have also tried adding the JMS Message to the method signature and performing manual validation.

Here is code for the JMS Configuration:

@Configuration
@EnableJms
class JmsConfig
{

    @Bean
    @Conditional(AWSEnvironmentCondition.class)
    public SQSConnectionFactory connectionFactory(@Value("${AWS_REGION}") String awsRegion)
    {
        return new SQSConnectionFactory(
            new ProviderConfiguration(),
            AmazonSQSClientBuilder.standard()
                                  .withRegion(Regions.fromName(awsRegion))
                                  .withCredentials(new DefaultAWSCredentialsProviderChain())
        );
    }

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory)
    {
        DefaultJmsListenerContainerFactory factory =
            new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setDestinationResolver(new DynamicDestinationResolver());
        factory.setConcurrency("3-10");
        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        factory.setErrorHandler(defaultErrorHandler());
        return factory;
    }

    @Bean
    public ErrorHandler defaultErrorHandler()
    {
        return new ErrorHandler()
        {
            @Override
            public void handleError(Throwable throwable)
            {
                LOG.error("JMS message listener error: {}", throwable.getMessage());
            }
        };
    }

    @Bean
    public JmsTemplate defaultJmsTemplate(ConnectionFactory connectionFactory)
    {
        return new JmsTemplate(connectionFactory);
    }
}

And here is code for the Listener:

@Component
public class MessagingListener
{
    @Autowired
    private MessageService _messageService;

    @Autowired
    private Validator _validator;

    @JmsListener(destination = "myqueue")
    public void receiveMessage(String messageJson)
    {
        try
        {
            LOG.info("Received message");

            // The following line throws an IOException is the message is not JSON.
            MyMessage myMessage = MAPPER.readvalue(messageJson, MyMessage.class);

            Set<ConstraintViolation<MyMessage>> _validator.validate(myMessage);
            if (CollectionUtils.isNotEmpty(violations))
            {
                String errorMessage = violations.stream()
                        .map(v -> String.join(" : ", v.getPropertyPath().iterator().next().getName(),
                                v.getMessage()))
                LOG.error("Exception occurred while validating the model, details: {}", errorMessage)
                throw new ValidationException(errorMessage);
            }
        }
        catch (IOException e)
        {
            LOG.error("Error parsing message", e);
            throw new ValidationException("Error parsing message, details: " + e.getMessage());
        }
    }
}

When a message is placed on the SQS queue with either invalid JSON or JSON that that does not pass validation, the message is received 20 times very quickly and then ends up on the DLQ. What needs to be done so that the Default Visibility Timeout setting in SQS is respected?

Nett answered 20/5, 2019 at 21:55 Comment(2)
Did you find a solution?Ludivinaludlew
Also facing this issue, any insights?Perennial
K
8

In case of an exception, visibility timeout of the failed message is set to 0 via ChangeMessageVisibility so SQS will send this message immediately even though queue has a different visibilityTimeout setting.

How does that happen?

As you can see here, Spring JMS' AbstractMessageListenerContainer briefly does this:

try {
    invokeListener(session, message); // This is your @JMSListener method
}
catch (JMSException | RuntimeException | Error ex) {
    rollbackOnExceptionIfNecessary(session, ex);
    throw ex;
}
commitIfNecessary(session, message);

On rollbackOnExceptionIfNecessary method, session.recover() will be invoked because:

  1. session.getTransacted() will always be false since SQS does not support transactions. See here.
  2. isClientAcknowledge(session) will return true because you're using CLIENT_ACKNOWLEDGE mode.

And lastly recover() of SQSSession negative acknowledges the message, which means setting visibilityTimeout of that specific message to 0, causes SQS to try sending that message immediately.

The easiest way to override this behavior would be implementing a CustomJmsListenerContainerFactory & CustomMessageListenerContainer instead of using DefaultJmsListenerContainerFactory & DefaultMessageListenerContainer.

public class CustomMessageListenerContainer extends DefaultMessageListenerContainer {

    public CustomMessageListenerContainer() {
        super();
    }

    @Override
    protected void rollbackOnExceptionIfNecessary(Session session, Throwable ex) {
        // do nothing, so that "visibilityTimeout" will stay same
    }

}

public class CustomJmsListenerContainerFactory extends DefaultJmsListenerContainerFactory {
    
    @Override
    protected DefaultMessageListenerContainer createContainerInstance() {
        return new CustomMessageListenerContainer();
    }
}

And make it a Spring bean either with @Component or just like you did in JmsConfig:

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
    DefaultJmsListenerContainerFactory factory = new CustomJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    // and set other stuff on factory
    return factory;
}

NOTE:
If your application is consuming other type of data sources along SQS with JMS, make sure to use different Container and ContainerFactory for them so that rollbackOnExceptionIfNecessary behaves as expected.

Kamasutra answered 16/10, 2020 at 18:15 Comment(3)
This worked as far as I have tested it. I am generally not a fan of overriding behaviors like this, in cases of updates and such, but I really couldn't find any other way or even mentions of this. What am I breaking by doing this override on "rollbackOnExceptionIfNecessary"?Demonolater
I mentioned about this in the answer. You can take a look at session.recover() link in the answer. It basically just negative acknowledges the messages, which means setting visibility timeouts to 0.Kamasutra
@SedatGocken, on your website you mentioned that removing rollbackOnExceptionIfNecessary() can lead to failed messages to be acknowledged. How can that happen?Naranjo

© 2022 - 2024 — McMap. All rights reserved.