Exponential backoff with message order guarantee using spring-kafka
Asked Answered
A

1

16

I'm trying to implement a Spring Boot-based Kafka consumer that has some very strong message delivery guarentees, even in a case of an error.

  • messages from a partition must be processed in order,
  • if message processing fails, the consumption of the particular partition should be suspended,
  • the processing should be retried with a backoff, until it succeeds.

Our current implementation fulfills these requirements:

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
  ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory());
  factory.setRetryTemplate(retryTemplate());

  final ContainerProperties containerProperties = factory.getContainerProperties();
  containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE);
  containerProperties.setErrorHandler(errorHandler());

  return factory;
}

@Bean
public RetryTemplate retryTemplate() {

  final ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
  backOffPolicy.setInitialInterval(1000);
  backOffPolicy.setMultiplier(1.5);

  final RetryTemplate template = new RetryTemplate();
  template.setRetryPolicy(new AlwaysRetryPolicy());    
  template.setBackOffPolicy(backOffPolicy);

  return template;
}

@Bean
public ErrorHandler errorHandler() {
  return new SeekToCurrentErrorHandler();
}

However, here, the record is locked by the consumer forever. At some point, the processing time will exceed max.poll.interval.ms and the server will reassign the partition to some other consumer, thus creating a duplicate.

Assuming max.poll.interval.ms equal to 5 mins (default) and the failure lasting 30 mins, this will cause the message to be processed ca. 6 times.

Another possiblity is to return the messages to the queue after N retries (e.g. 3 attempts), by using SimpleRetryPolicy. Then, the message will be replayed (thanks to SeekToCurrentErrorHandler) and the processing will start from scratch, again up to 5 attempts. This results in delays forming a series e.g.

10 secs -> 30 secs -> 90 secs -> 10 secs -> 30 secs -> 90 secs -> ...

which is less desired than an constantly rising one :)

Is there any third scenario which could keep the delays forming an ascending series and, at the same time, not creating duplicates in the aforementioned example?

Acciaccatura answered 1/2, 2018 at 15:54 Comment(1)
but if number of consumer = number of topics, this consumer became iddle, kafka can`t assing to other consumer because all others consumer are busy. (one thread per consumer).Chordate
K
8

It can be done with stateful retry - in which case the exception is thrown after each retry, but state is maintained in the retry state object, so the next delivery of that message will use the next delay etc.

This requires something in the message (e.g. a header) to uniquely identify each message. Fortunately, with Kafka, the topic, partition and offset provide that unique key for the state.

However, currently, the RetryingMessageListenerAdapter does not support stateful retry.

You could disable retry in the listener container factory and use a stateful RetryTemplate in your listener, using one of the execute methods that taks a RetryState argument.

Feel free to add a GitHub issue for the framework to support stateful retry; contributions are welcome! - pull request issued.

EDIT

I just wrote a test case to demonstrate using stateful recovery with a @KafkaListener...

/*
 * Copyright 2018 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.kafka.annotation;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.retry.RetryState;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.DefaultRetryState;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @author Gary Russell
 * @since 5.0
 *
 */
@RunWith(SpringRunner.class)
@DirtiesContext
public class StatefulRetryTests {

    private static final String DEFAULT_TEST_GROUP_ID = "statefulRetry";

    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 1, "sr1");

    @Autowired
    private Config config;

    @Autowired
    private KafkaTemplate<Integer, String> template;

    @Test
    public void testStatefulRetry() throws Exception {
        this.template.send("sr1", "foo");
        assertThat(this.config.listener1().latch1.await(10, TimeUnit.SECONDS)).isTrue();
        assertThat(this.config.listener1().latch2.await(10, TimeUnit.SECONDS)).isTrue();
        assertThat(this.config.listener1().result).isTrue();
    }

    @Configuration
    @EnableKafka
    public static class Config {

        @Bean
        public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
            return factory;
        }

        @Bean
        public DefaultKafkaConsumerFactory<Integer, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }

        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> consumerProps =
                    KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", embeddedKafka);
            consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            return consumerProps;
        }

        @Bean
        public KafkaTemplate<Integer, String> template() {
            KafkaTemplate<Integer, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
            return kafkaTemplate;
        }

        @Bean
        public ProducerFactory<Integer, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }

        @Bean
        public Map<String, Object> producerConfigs() {
            return KafkaTestUtils.producerProps(embeddedKafka);
        }

        @Bean
        public Listener listener1() {
            return new Listener();
        }

    }

    public static class Listener {

        private static final RetryTemplate retryTemplate = new RetryTemplate();

        private static final ConcurrentMap<String, RetryState> states = new ConcurrentHashMap<>();

        static {
            ExponentialBackOffPolicy backOff = new ExponentialBackOffPolicy();
            retryTemplate.setBackOffPolicy(backOff);
        }

        private final CountDownLatch latch1 = new CountDownLatch(3);

        private final CountDownLatch latch2 = new CountDownLatch(1);

        private volatile boolean result;

        @KafkaListener(topics = "sr1", groupId = "sr1")
        public void listen1(final String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                @Header(KafkaHeaders.OFFSET) long offset) {
            String recordKey = topic + partition + offset;
            RetryState retryState = states.get(recordKey);
            if (retryState == null) {
                retryState = new DefaultRetryState(recordKey);
                states.put(recordKey, retryState);
            }
            this.result = retryTemplate.execute(c -> {

                // do your work here

                this.latch1.countDown();
                throw new RuntimeException("retry");
            }, c -> {
                latch2.countDown();
                return true;
            }, retryState);
            states.remove(recordKey);
        }

    }

}

and

Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void org.springframework.kafka.annotation.StatefulRetryTests$Listener.listen1(java.lang.String,java.lang.String,int,long)' threw exception; nested exception is java.lang.RuntimeException: retry

after each delivery attempt.

In this case, I added a recoverer to handle the message after retries are exhausted. You could do something else, like stop the container (but do that on a separate thread, like we do in the ContainerStoppingErrorHandler).

Karlotta answered 1/2, 2018 at 17:47 Comment(5)
I added an example of using stateful recovery within the listener.Karlotta
When using stateful-retry, where is the state persisted by default? When using the ExponentialBackoffPolicy on the RetryTemplate, I am guessing the state of the message would have to be checked to see the next backoff period. Please excuse my ignorance of how this works as I am new to this.Bromleigh
You should ask a new question rather than commenting on an old one. The reason for using stateful retry is to prevent exceeding max.poll.interval.ms to avoid a rebalance. The SeekToCurrentErrorHandler resets the offsets so the unprocessed records are re-fetched on the next poll; the failed record is then immediately redelivered and the thread suspended for the next backOff. So; some state (offset) is kept in Kafka and some state (retry attempts, backOff) are kept in memory not persisted. If the app crashes, the offset is retained ok, but the retry state will start again from the beginning.Karlotta
@GaryRussell Thank you.I was looking for the above solution.Could you please explain me what is the need of latch and also what do you mean by retries are exhausted is it like exponential backoff will be reset within 30 seconds? And also are you referring SeekToCurrentErrorHandler as the recoverer in your example?Pekingese
This is an old answer; don't ask new questions here. Things have moved on since then, retry and backoff has now been added to the STCEH so retry at the listener level is no longer needed. See the documentation; if you still have issues, ask a new question. >Now that the SeekToCurrentErrorHandler can be configured with a BackOff and has the ability to retry only certain exceptions (since version 2.3), the use of stateful retry, via the listener adapter retry configuration, is no longer necessary. ...Karlotta

© 2022 - 2025 — McMap. All rights reserved.