Creating a new consumer when cosumer stops due to an error in Reactor Kafka
Asked Answered
G

0

1

I am working on an application where I have multiple consumers for each Topic partition so there is concurrency in reading from the topic. I followed this link to ensure that the consumer gets created again if the existing consumer stops. .repeat will create the new consumer. I have been trying to test this scenario:

Below is my code along with test:

@Bean
public ReceiverOptions<String, String> kafkaReceiverOptions(String topic, KafkaProperties kafkaProperties) {
    ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
    return basicReceiverOptions.subscription(Collections.singletonList(topic))
            .addAssignListener(receiverPartitions -> log.debug("onPartitionAssigned {}", receiverPartitions))
            .addRevokeListener(receiverPartitions -> log.debug("onPartitionsRevoked {}", receiverPartitions));
}

@Bean
public ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) {
    return new ReactiveKafkaConsumerTemplate<String, String>(kafkaReceiverOptions);
}

@Bean
public ReactiveKafkaProducerTemplate<String, List<Object>> kafkaProducerTemplate(
        KafkaProperties properties) {
    Map<String, Object> props = properties.buildProducerProperties();
    return new ReactiveKafkaProducerTemplate<String, List<Object>>(SenderOptions.create(props));
}


public void run(String... args) {

        for(int i = 0; i < topicPartitionsCount ; i++) {
            readWrite(destinationTopic).subscribe();
        }
    }}


public Flux<String> readWrite(String destTopic) {
        AtomicBoolean repeatConsumer = new AtomicBoolean(false);
        return kafkaConsumerTemplate
                .receiveAutoAck()
                .doOnNext(consumerRecord -> log.debug("received key={}, value={} from topic={}, offset={}",
                        consumerRecord.key(),
                        consumerRecord.value(),
                        consumerRecord.topic(),
                        consumerRecord.offset())
                )
                //.doOnNext(consumerRecord -> log.info("Record received from partition {} in thread {}", consumerRecord.partition(),Thread.currentThread().getName()))
                .doOnNext(s-> sendToKafka(s,destinationTopic))
                .map(ConsumerRecord::value)
                .doOnNext(record -> log.debug("successfully consumed {}={}", Metric[].class.getSimpleName(), record))
                .doOnError(exception -> log.debug("Error occurred while processing the message, attempting retry. Error message: {}", exception.getMessage()))
                .retryWhen(Retry.backoff(Integer.parseInt(retryAttempts), Duration.ofSeconds(Integer.parseInt(retryAttemptsDelay))).transientErrors(true))
                .onErrorContinue((exception,errorConsumerRecord)->{
                    ReceiverRecordException recordException = (ReceiverRecordException)exception;
                    log.debug("Retries exhausted for : {}", recordException);
                    recordException.getRecord().receiverOffset().acknowledge();
                    repeatConsumer.set(true);
                })
                .repeat(repeatConsumer::get); // will create a new consumer if the existing consumer stops
    }


public class ReceiverRecordException extends RuntimeException {
    private final ReceiverRecord record;

    ReceiverRecordException(ReceiverRecord record, Throwable t) {
        super(t);
        this.record = record;
    }

    public ReceiverRecord getRecord() {
        return this.record;
    }
}

Test:

@Test
public void readWriteCreatesNewConsumerWhenCurrentConsumerStops() {
    AtomicInteger recordNumber = new AtomicInteger(0);
    Mockito
            .when(reactiveKafkaConsumerTemplate.receiveAutoAck())
            .thenReturn(
                    Flux.create(consumerRecordFluxSink -> {
                        if (recordNumber.getAndIncrement() < 5) {
                            consumerRecordFluxSink.error(new RuntimeException("Kafka down"));
                        } else {
                            consumerRecordFluxSink.next(createConsumerRecord(validMessage));
                            consumerRecordFluxSink.complete();
                        }
                    })
            );

    Flux<String> actual = service.readWrite();

    StepVerifier.create(actual)
            .verifyComplete();

}

When I run the test, I get the record retry exception - onError(reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3 in a row (3 total)))

My understanding was onErrorContinue will catch the exception and then continue with the next records. But it looks like it is throwing an exception. Since it is throwing an exception how does repeat() work? I would really appreciate if some one could help me understand how to test this scenario?

Gramme answered 28/10, 2021 at 23:12 Comment(9)
Would you mind to share what exactly exception you got?Diversified
I just updated the OP with the exception. I get the record retry exception: onError(reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3 in a row (3 total)))Gramme
An you do the cast in the onErrorContinue() (ReceiverRecordException)exception, which is definitely not emitted RetryExhaustedException...Diversified
Yes, I added the ReceiverRecordException class into the OP as well. Realized I missed it.Gramme
Does it work now? Can you summarize it as an answer to your own question?Diversified
I am sorry I think I misunderstood. I already had the cast (ReceiverRecordException)exception in OnErrorContinue() and I still get the error.Gramme
But you said that exception is RetryExhaustedException, where that cast would lead to the ClassCastException since the RetryExhaustedException is really not a ReceiverRecordException. That's my point. your onErrorContinue() is a bit wrongDiversified
Can you help me understand what is wrong in onErrorContinue because it looks correct to me and shouldn't be throwing Retry exception. I cannot figure out what I am missing here.Gramme
See onErrorContinue() JavaDocs: it does not work in all the cases. I'm not sure if it is ignored for your StepVerifier or so, but better to consider onErrorResume() instead. The repeat() will do the trick to come back to the consumer.Diversified

© 2022 - 2025 — McMap. All rights reserved.