Override Boot's listener container factory bean, as described in Enable Listener Endpoint Annotations.
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
return factory;
You can inject a custom implementation of ErrorHandler
which will be added to each listener container the factory creates.
void handleError(Throwable t);
The throwable will be a ListenerExecutionFailedException
which, starting with version 1.6.7 (boot 1.4.4), has the raw inbound message in its failedMessage
The default error handler considers causes such as MessageConversionException
to be fatal (they will not be requeued).
If you wish to retain that behavior (normal for such problems), you should throw an AmqpRejectAndDontRequeueException
after handling the error.
By the way, you don't need that RabbitTemplate
bean; if you have just one MessageConverter
bean in the application, boot will auto-wire it into the containers and template.
Since you will be overriding boot's factory, you will have to wire in the converter there.
You could use the default ConditionalRejectingErrorHandler
, but inject it with a custom implementation of FatalExceptionStrategy
. In fact, you could subclass its DefaultExceptionStrategy
and override isFatal(Throwable t)
, then, after handing the error, return super.isFatal(t)
Full example; sends 1 good message and 1 bad one:
package com.example;
import org.slf4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.util.ErrorHandler;
public class So42215050Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So42215050Application.class, args);
private RabbitTemplate template;
private void runDemo() throws Exception {
this.template.convertAndSend(queue().getName(), new Foo("bar"));
this.template.convertAndSend(queue().getName(), new Foo("bar"), m -> {
return new Message("some bad json".getBytes(), m.getMessageProperties());
@RabbitListener(queues = "So42215050")
public void handle(Foo in) {
System.out.println("Received: " + in);
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
return factory;
public ErrorHandler errorHandler() {
return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
public Queue queue() {
return new Queue("So42215050", false, false, true);
public MessageConverter jsonConverter() {
return new Jackson2JsonMessageConverter();
public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());
public boolean isFatal(Throwable t) {
if (t instanceof ListenerExecutionFailedException) {
ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
logger.error("Failed to process inbound message from queue "
+ lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
+ "; failed message: " + lefe.getFailedMessage(), t);
return super.isFatal(t);
public static class Foo {
private String foo;
public Foo() {
public Foo(String foo) {
this.foo = foo;
public String getFoo() {
return this.foo;
public void setFoo(String foo) {
this.foo = foo;
public String toString() {
return "Foo [foo=" + this.foo + "]";
Received: Foo [foo=bar]
2017-02-14 09:42:50.972 ERROR 44868 --- [cTaskExecutor-1] 5050Application$MyFatalExceptionStrategy : Failed to process inbound message from queue So42215050; failed message: (Body:'some bad json' MessageProperties [headers={TypeId=com.example.So42215050Application$Foo}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=So42215050, receivedDelay=null, deliveryTag=2, messageCount=0, consumerTag=amq.ctag-P2QqY0PMD1ppX5NnkUPhFA, consumerQueue=So42215050])
JSON does not convey any type information. By default, the type to convert to will be inferred from the method parameter type. If you wish to reject anything that can't be converted to that type, you need to configure the message converter appropriately.
For example:
public MessageConverter jsonConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
DefaultClassMapper mapper = new DefaultClassMapper();
return converter;
Now, when I change my example to send a Bar
instead of a Foo
public static class Bar {
this.template.convertAndSend(queue().getName(), new Bar("baz"));
I get...
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Cannot handle message
... 13 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.So42215050Application$Bar] to [com.example.So42215050Application$Foo] for GenericMessage [payload=Bar [foo=baz], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=So42215050, amqp_contentEncoding=UTF-8, amqp_deliveryTag=3, amqp_consumerQueue=So42215050, amqp_redelivered=false, id=6d7e23a3-c2a7-2417-49c9-69e3335aa485, amqp_consumerTag=amq.ctag-6JIGkpmkrTKaG32KVpf8HQ, contentType=application/json, __TypeId__=com.example.So42215050Application$Bar, timestamp=1488489538017}]
But this only works if the sender sets the __TypeId__
header (which the template does if it's configured with the same adapter).
public class So42215050Application {
private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So42215050Application.class, args);
private RabbitTemplate template;
private void runDemo() throws Exception {
this.template.convertAndSend(queue().getName(), new Foo("bar")); // good - converter sets up type
this.template.convertAndSend(queue().getName(), new Foo("bar"), m -> {
return new Message("some bad json".getBytes(), m.getMessageProperties()); // fail bad json
Message message = MessageBuilder
this.template.send(queue().getName(), message); // Success - default Foo class when no header
message.getMessageProperties().setHeader("__TypeId__", "foo");
this.template.send(queue().getName(), message); // Success - foo is mapped to Foo
message.getMessageProperties().setHeader("__TypeId__", "bar");
this.template.send(queue().getName(), message); // fail - mapped to a Map
@RabbitListener(queues = "So42215050")
public void handle(Foo in) {
logger.info("Received: " + in);
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
return factory;
public ErrorHandler errorHandler() {
return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
public Queue queue() {
return new Queue("So42215050", false, false, true);
public MessageConverter jsonConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
DefaultClassMapper mapper = new DefaultClassMapper();
Map<String, Class<?>> mappings = new HashMap<>();
mappings.put("foo", Foo.class);
mappings.put("bar", Object.class);
return converter;
public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());
public boolean isFatal(Throwable t) {
if (t instanceof ListenerExecutionFailedException) {
ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
logger.error("Failed to process inbound message from queue "
+ lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
+ "; failed message: " + lefe.getFailedMessage(), t);
return super.isFatal(t);
public static class Foo {
private String foo;
public Foo() {
public Foo(String foo) {
this.foo = foo;
public String getFoo() {
return this.foo;
public void setFoo(String foo) {
this.foo = foo;
public String toString() {
return "Foo [foo=" + this.foo + "]";
public static class Bar {
private String foo;
public Bar() {
public Bar(String foo) {
this.foo = foo;
public String getFoo() {
return this.foo;
public void setFoo(String foo) {
this.foo = foo;
public String toString() {
return "Bar [foo=" + this.foo + "]";