Spring - Validate incoming message in RabbitMQ listener
Asked Answered
A

3

6

I am using Spring Boot framework. I want to send an object from a service to another service via RabbitMQ like this:

Service A:

rabbitTemplate.convertAndSend("queue", createAccountRequestMessage);

Service B:

@RabbitListener(queues = "queue")
public void onAccountRequested(@Valid CreateAccountRequestMessage createAccountRequestMessage, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG, long tag) throws IOException
{
    
}

In CreateAccountRequestMessage class I have defined some validation annotations like @NotEmpty, @NotNull and etc, but when I'm sending wrong message from service A to service B, @Valid annotation doesn't work and CreateAccountRequestMessage object is not validated before invoke onAccountRequested method.

Astonied answered 20/11, 2017 at 14:39 Comment(0)
F
10

You need to set the validator in DefaultMessageHandlerMethodFactory.

@Autowired
SmartValidator validator;

@Bean
public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
    DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
    factory.setValidator(this.validator);
    return factory;
}

Then you also need to specify the @Payload annotation along with the @Valid annotation.

@RabbitListener(queues = "queue")
public void onAccountRequested(@Valid @Payload CreateAccountRequestMessage 
    createAccountRequestMessage, Channel channel, 
    @Header(AmqpHeaders.DELIVERY_TAG, long tag) throws IOException
{

}

Now MethodArgumentNotValidException will be thrown and the message will be discarded, or you can send the message to a dead letter exchange.

Foulk answered 13/12, 2017 at 15:53 Comment(2)
It doesn't work for me. No qualifying bean of type 'org.springframework.validation.SmartValidator' availableAppling
@Appling you are right about your answer below, validator to use might vary depending on the app setup.Foulk
A
2

I had the same problem. The answer of @Praveer works well except SmartValidator. I post here my solution, which is inspired by this article https://blog.trifork.com/2016/02/29/spring-amqp-payload-validation/

@Configuration
@EnableRabbit
@Slf4j
public class CmsMQConfig implements RabbitListenerConfigurer {

    @Value("${dw.rabbitmq.hosts}")
    private String hosts;

    @Value("${dw.rabbitmq.username}")
    private String username;

    @Value("${dw.rabbitmq.password}")
    private String password;

    @Value("${dw.rabbitmq.virtual-host}")
    private String virtualHost;

    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setMessageConverter(messageConverter());
        return factory;
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(hosts);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        return connectionFactory;
    }

    @Bean
    public Jackson2JsonMessageConverter messageConverter() {
        ObjectMapper mapper = new ObjectMapper();
        mapper.registerModule(new JodaModule());
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        return new Jackson2JsonMessageConverter(mapper);
    }

    @Bean
    public DefaultMessageHandlerMethodFactory defaultHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setValidator(amqpValidator());
        return factory;
    }

    @Bean
    public Validator amqpValidator() {
        return new OptionalValidatorFactoryBean();
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setContainerFactory(rabbitListenerContainerFactory());
        registrar.setMessageHandlerMethodFactory(defaultHandlerMethodFactory());
    }
}
Appling answered 24/3, 2021 at 10:31 Comment(0)
O
1

I had similar problem.

My message was defined as such:

import jakarta.validation.constraints.NotBlank;

public class ValidatedBook {
    @NotBlank
    public String name;
}

I sent messages on proper queue like those:

{
"name":"asd"   # should pass
}

{
"name":""      # should fail
}

{
               # should fail
}

And I had listener defined like this:

import jakarta.validation.Valid;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.validation.annotation.Validated;


@Component
@Validated
public class QueueReceiver {

    @RabbitListener(queues = "queue", errorHandler = "errorHandler")
    public void receive(@Valid @Payload ValidatedBook book) {}
}

which did not validate ValidatedBook object, messages always passed validation.

I checked that validation should throw exception with manually written validation:

    ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
    Validator validator = factory.getValidator();
    Set<ConstraintViolation<ValidatedBook>> things = validator.validate(book);

Book was simply not valid, when name was empty or null, yet @Valid did nothing!

Solution to my problem was adding additional annotation @Validated on class-level:

@Component
@Validated
public class QueueReceiver {
 @RabbitListener(queues = "queue", errorHandler = "errorHandler")
    void receiver(@Valid @Payload ValidatedBook object) {}
}

Now messages which should fail - fail with ConstraintViolationException (do not enter method), and I can easily catch exception in ErrorHandler class (* implements RabbitListenerErrorHandler).

Originative answered 2/5, 2023 at 19:52 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.