How to create multiple instances of KafkaReceiver in Spring Reactor Kafka
Asked Answered
C

1

6

I have a reactive kafka application that reads data from a topic and writes to another topic. The topic has multiple partitions and I want to create the same number of consumers(in the same consumer group) as the partitions in the topic. From what I understand from this thread .receive() will create only one instance of KafkaReceiver that will read from all the partitions in the topic. So I would need multiple receivers to read from different partitions in parallel.

To do that I came up with the following code:

@Bean
    public ReceiverOptions<String, String> kafkaReceiverOptions(String topic, KafkaProperties kafkaProperties) {
        ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
        return basicReceiverOptions.subscription(Collections.singletonList(topic))
                .addAssignListener(receiverPartitions -> log.debug("onPartitionAssigned {}", receiverPartitions))
                .addRevokeListener(receiverPartitions -> log.debug("onPartitionsRevoked {}", receiverPartitions));
    }

    @Bean
    public ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) {
        return new ReactiveKafkaConsumerTemplate<String, String>(kafkaReceiverOptions);
    }

    @Bean
    public ReactiveKafkaProducerTemplate<String, List<Object>> kafkaProducerTemplate(
            KafkaProperties properties) {
        Map<String, Object> props = properties.buildProducerProperties();
        return new ReactiveKafkaProducerTemplate<String, List<Object>>(SenderOptions.create(props));
    }


public void run(String... args) {

        for(int i = 0; i < topicPartitionsCount ; i++) {
            readWrite(destinationTopic).subscribe();
        }
    }}


public Flux<String> readWrite(String destTopic) {
        return kafkaConsumerTemplate
                .receiveAutoAck()
                .doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
                        consumerRecord.key(),
                        consumerRecord.value(),
                        consumerRecord.topic(),
                        consumerRecord.offset())
                )
                .doOnNext(consumerRecord -> log.info("Record received from partition {} in thread {}", consumerRecord.partition(),Thread.currentThread().getName()))
                .doOnNext(s-> sendToKafka(s,destTopic))
                .map(ConsumerRecord::value)               
                .onErrorContinue((exception,errorConsumer)->{
                    log.error("Error while consuming : {}", exception.getMessage());
                });
    }

public void sendToKafka(ConsumerRecord<String, String> consumerRecord, String destTopic){
   kafkaProducerTemplate.send(destTopic, consumerRecord.key(), transformRecord(consumerRecord))
                    .doOnNext(senderResult -> log.info("Record received from partition {} in thread {}", consumerRecord.partition(),Thread.currentThread().getName()))
                    .doOnSuccess(senderResult -> {
                        log.debug("Sent {} offset : {}", metrics, senderResult.recordMetadata().offset());
                    }
                    .doOnError(exception -> {
                        log.error("Error while sending message to destination topic : {}", exception.getMessage());
                    })
                    .subscribe();
}

When I tested this it seems to be working correctly, multiple Kafka Receiver instances are created that process the partitions in parallel. My question is, is this the most efficient way to create multiple instances? Is there another way in reactive Kafka to do this?

Cheery answered 27/10, 2021 at 23:55 Comment(1)
what if you have to return the messages of Kafka to your Get API e.g. over ServerSentEvent(SSE connection). Do you have idea how? As here return type of run is a void, I need same code but Kafka messages have to be sent to sse channel.please suggest #reactorCriner
L
4

What you have done is correct.

Lariat answered 28/10, 2021 at 13:36 Comment(14)
Thanks for confirming. A question i had with this approach is if one of the instances fails and stops consuming from its respective partiotion, would that partition be picked up by any of the other instances? I want to make sure i won’t lose any data.Cheery
Yes; a rebalance will redistribute the partiations. There are properties you can set to delay a rebalance if, say, you want to stop and restart a receiver.Lariat
My understanding is rebalance would occur automatically if not specified other wise. Is my understanding correct?Cheery
That is correct, yes.Lariat
One issue I see here is say I have 10 partitions, i create 10 receiver instances. One of the instances fail, a rebalance occurs and the 10 partitions will be distributed among 9 instances. There is no way to get back to 10 instances, we would continue to run with 10 instances unless the application restarts. Is my understanding correct?Cheery
Ignore my comment about delaying the rebalance; the property I was thinking of was group.initial.delay.ms kafka.apache.org/documentation/… which avoids multiple rebalances while consumers are first starting. It defaults to 3 seconds so, as long as you subscribe the receivers in that time, you can avoid rebalances during the startup (or you can increase it). It's a broker property.Lariat
Or you can restart that consumer. You can add code to the flux to do that after an error or restart it some other way.Lariat
Can you help with restarting the consumer in flux? I was looking at the documentation and haven't found the option yet.Cheery
See this issue github.com/reactor/reactor-kafka/issues/…Lariat
I tried to implement restarting the consumer based on the issue you mentioned. But I have a couple of questions on that. Since its different from this question, I created another question specifically for that #69762433 Can you please take a look when you get a chance.Cheery
After reading from Kafka, I am transforming the record and writing to another Kafka. I have added the sendToKafka code in the OP. Since I have multiple consumers, each consumer runs on a different thread, would kafka send also run on the same thread? I tried to test by logging the thread name to understand the thread workflow, the receive thread name is different for each consumer, but on kafka send the thread name is the same for all the consumers. I don't understand how that works, i would expect it to be different for all consumers while send as well. Can you help me understand?Cheery
@GaryRussell how this code can be used to send Kafka messages to sse channel as ServerSentEvent?Criner
@GaryRussell how can we get the flux result and send it to caller using above code. I have to send kafka flux data to user calling the API as ServerSentEvent. Please suggest.Criner
Is it really working in parallel? I did the same way but in a slightly different way and I never see them processing in parallel. @EventListener(ApplicationReadyEvent.class) public void begin() { for (int i = 0; i < 3; i++) { consumer.receiveAutoAck() .doOnNext(log.info(e)) .subscribe(); } } What I always see is : the first consumer processes all its events, then the 2nd consumer and then the 3rd consumer.Deckert

© 2022 - 2025 — McMap. All rights reserved.