How can I retry failure messages from kafka?
Asked Answered
Z

4

14

My spring-boot application(consumer) process messages from Apache Kafka. Periodically, massage can't process and consumer throw exception. Consumer commits offset anyway. Can I distinguish success messages from failure messages in Kafka? I think, I can't. Is it true? If it is true, I have the main question:

How can I retry failure messages? I know some ways but I'm not sure of their correctness.

1) Change offset to early. But in this way success messages will retry too.

2) When I catch an exception, I send this message to another topic(error-topic for example). But it looks difficult.

3) Something else(your variant)

Zenithal answered 8/4, 2019 at 8:9 Comment(1)
What do you mean with "Consumer commits offset anyway."? Do you have spring.kafka.consumer.enable-auto-commit enabled?Connivent
J
17

If you want at-least once guarantee, a general pattern is as follows:

  • Disable auto commit (set enable.auto.commit to false)
  • Consume messages
  • For each message:

    • If no errors, then commit offset
    • If error, retry as many times you wish
    • If successful, commit
    • If you want to give up, log or publish to an error queue (for analysis or later retry)
  • Repeat

Jasonjasper answered 8/4, 2019 at 13:16 Comment(3)
if I will retry as many times you wish I can't process other messages and my topic will increaseZenithal
A simple retry is not very useful. Chances are that immediate retries will fail as well (say because an external API not available, etc.). For retries, you should as well think about uing a retry topic.Absher
"f I will retry as many times you wish " -> then set retry=0Jasonjasper
B
6

Use a SeekToCurrentErrorHandler. It will reposition the offset to replay the message (10 times by default, but configurable).

After the retries are exhausted, it calls a "recoverer" that can take some action, such as the DeadLetterPublishingRecoverer.

Bigley answered 8/4, 2019 at 14:4 Comment(2)
thanks @gary-russell What if I want to implement some kind of requeue behaviour? For instance, my app processes 20 messages concurrently and then I want to requeue 1,5 and 7th message. Should I explicitly publish the messages to the same topic or I can somehow ack these specific messages?Crossfertilization
The DLPR can be configured to publish back to the same queue. You should not process records concurrently on a single consumer. Kafka only maintains an offset. Use partitions to achieve concurrency.Bigley
N
0

You can make the following changes in YourConsumer config:

  1. enable.auto.commit=False

  2. RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(retryInterval);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
        retryTemplate.setRetryPolicy(new SimpleRetryPolicy(retryMaxCount));
    
        return retryTemplate;
    }
    
  3. In your kafkaListenerContainerFactory:

    setretryTemplate(retryTemplate);
    factory.getContainerProperties().setAckOnError(true);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
    

Now, In your code just throw the exception in your consumer whenever an exception occurs. This will not update the offset whenever an exception occurs. And It will retry after retryInterval time for a maximum of maxRetryCount.

And If you want to ignore and not retry on certain type of exceptions then create a map of exception like below and pass it in SimpleRetryPolicy() like in the code below:

Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
        exceptionMap.put(IllegalArgumentException.class, false);
        exceptionMap.put(TimeoutException.class, true);

For more detail visit this link: Kafka Error Handling

Neoclassicism answered 29/4, 2021 at 10:13 Comment(0)
D
0

You can try using non-blocking retry mechanism that Spring Boot provides. You can achieve this functionality usign @RetryableTopic inside this annotation you can configure a lot of features. In Default setttings it will retry 3 times then if exception still exists message will be produced into dlt queue which you can consume from that queue by adding @DltHandler to method and inside this method you can implement your Dead Letter Queueu logic.And @DltHandler method should be in same class with @KafkaListener and @RetryableTopic methods.

Devol answered 7/8, 2023 at 12:18 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.