I am working on an application where I have multiple consumers for each Topic partition so there is concurrency in reading from the topic. I followed this link to ensure that the consumer gets created again if the existing consumer stops. .repeat will create the new consumer. I have been trying to test this scenario:
Below is my code along with test:
@Bean
public ReceiverOptions<String, String> kafkaReceiverOptions(String topic, KafkaProperties kafkaProperties) {
ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
return basicReceiverOptions.subscription(Collections.singletonList(topic))
.addAssignListener(receiverPartitions -> log.debug("onPartitionAssigned {}", receiverPartitions))
.addRevokeListener(receiverPartitions -> log.debug("onPartitionsRevoked {}", receiverPartitions));
}
@Bean
public ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) {
return new ReactiveKafkaConsumerTemplate<String, String>(kafkaReceiverOptions);
}
@Bean
public ReactiveKafkaProducerTemplate<String, List<Object>> kafkaProducerTemplate(
KafkaProperties properties) {
Map<String, Object> props = properties.buildProducerProperties();
return new ReactiveKafkaProducerTemplate<String, List<Object>>(SenderOptions.create(props));
}
public void run(String... args) {
for(int i = 0; i < topicPartitionsCount ; i++) {
readWrite(destinationTopic).subscribe();
}
}}
public Flux<String> readWrite(String destTopic) {
AtomicBoolean repeatConsumer = new AtomicBoolean(false);
return kafkaConsumerTemplate
.receiveAutoAck()
.doOnNext(consumerRecord -> log.debug("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
//.doOnNext(consumerRecord -> log.info("Record received from partition {} in thread {}", consumerRecord.partition(),Thread.currentThread().getName()))
.doOnNext(s-> sendToKafka(s,destinationTopic))
.map(ConsumerRecord::value)
.doOnNext(record -> log.debug("successfully consumed {}={}", Metric[].class.getSimpleName(), record))
.doOnError(exception -> log.debug("Error occurred while processing the message, attempting retry. Error message: {}", exception.getMessage()))
.retryWhen(Retry.backoff(Integer.parseInt(retryAttempts), Duration.ofSeconds(Integer.parseInt(retryAttemptsDelay))).transientErrors(true))
.onErrorContinue((exception,errorConsumerRecord)->{
ReceiverRecordException recordException = (ReceiverRecordException)exception;
log.debug("Retries exhausted for : {}", recordException);
recordException.getRecord().receiverOffset().acknowledge();
repeatConsumer.set(true);
})
.repeat(repeatConsumer::get); // will create a new consumer if the existing consumer stops
}
public class ReceiverRecordException extends RuntimeException {
private final ReceiverRecord record;
ReceiverRecordException(ReceiverRecord record, Throwable t) {
super(t);
this.record = record;
}
public ReceiverRecord getRecord() {
return this.record;
}
}
Test:
@Test
public void readWriteCreatesNewConsumerWhenCurrentConsumerStops() {
AtomicInteger recordNumber = new AtomicInteger(0);
Mockito
.when(reactiveKafkaConsumerTemplate.receiveAutoAck())
.thenReturn(
Flux.create(consumerRecordFluxSink -> {
if (recordNumber.getAndIncrement() < 5) {
consumerRecordFluxSink.error(new RuntimeException("Kafka down"));
} else {
consumerRecordFluxSink.next(createConsumerRecord(validMessage));
consumerRecordFluxSink.complete();
}
})
);
Flux<String> actual = service.readWrite();
StepVerifier.create(actual)
.verifyComplete();
}
When I run the test, I get the record retry exception - onError(reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3 in a row (3 total)))
My understanding was onErrorContinue will catch the exception and then continue with the next records. But it looks like it is throwing an exception. Since it is throwing an exception how does repeat() work? I would really appreciate if some one could help me understand how to test this scenario?
onErrorContinue()
(ReceiverRecordException)exception
, which is definitely not emittedRetryExhaustedException
... – DiversifiedRetryExhaustedException
, where that cast would lead to theClassCastException
since theRetryExhaustedException
is really not aReceiverRecordException
. That's my point. youronErrorContinue()
is a bit wrong – DiversifiedonErrorContinue()
JavaDocs: it does not work in all the cases. I'm not sure if it is ignored for yourStepVerifier
or so, but better to consideronErrorResume()
instead. Therepeat()
will do the trick to come back to the consumer. – Diversified