Can a single Spring's KafkaConsumer listener listens to multiple topic?
Asked Answered
R

3

17

Anyone know if a single listener can listens to multiple topic like below? I know just "topic1" works, what if I want to add additional topics? Can you please show example for both below? Thanks for the help!

@KafkaListener(topics = "topic1,topic2")
public void listen(ConsumerRecord<?, ?> record, Acknowledgment ack) {
    System.out.println(record);
} 

or

ContainerProperties containerProps = new ContainerProperties(new TopicPartitionInitialOffset("topic1, topic2", 0));
Reichsmark answered 25/1, 2017 at 20:36 Comment(1)
My case need to connect one Kafka topic to fetch data using spring boot this data having another Kafka topic name read this information and connect to new topic fetch the data and perform some business logic . could you please help me to write spring boot code.Archduchess
S
24

Yes, just follow the @KafkaListener JavaDocs:

/**
 * The topics for this listener.
 * The entries can be 'topic name', 'property-placeholder keys' or 'expressions'.
 * Expression must be resolved to the topic name.
 * Mutually exclusive with {@link #topicPattern()} and {@link #topicPartitions()}.
 * @return the topic names or expressions (SpEL) to listen to.
 */
String[] topics() default {};

/**
 * The topic pattern for this listener.
 * The entries can be 'topic name', 'property-placeholder keys' or 'expressions'.
 * Expression must be resolved to the topic pattern.
 * Mutually exclusive with {@link #topics()} and {@link #topicPartitions()}.
 * @return the topic pattern or expression (SpEL).
 */
String topicPattern() default "";

/**
 * The topicPartitions for this listener.
 * Mutually exclusive with {@link #topicPattern()} and {@link #topics()}.
 * @return the topic names or expressions (SpEL) to listen to.
 */
TopicPartition[] topicPartitions() default {};

So, your use-case should be like:

@KafkaListener(topics = {"topic1" , "topic2"})
Swiger answered 25/1, 2017 at 20:48 Comment(13)
And for the other case, you need a TopicPartitionInitialOffset for each one.Heated
thank you both, VERY HELPFUL! also got Gary's way working!Reichsmark
Is there a way to configure different containerFactories for different topics with the same listener? In my case, the message format will be different for different topics.Nona
No, you need to have different @KafkaListener, for its own containerFactorySwiger
May i know how this works for batch listener, suppose if we have batch listener with 2 topics and concurrency 1, by any chance single batch poll can get data from both the topic ? @ArtemBilan and @GaryRussellCrossopterygian
That's correct. See JavaDocs of KafkaConsumer.poll(): ` Fetch data for the topics or partitions specified using one of the subscribe/assign APIs.`Swiger
2.3.2 added a new property subBatchPerPartition, when true, the container will hand the listener a batch of records for each partition. It breaks up the records returned by the poll.Heated
Thank you so much sir, but any information will help me if you can point me to separate batches based on topics instead of partition @GaryRussell and ArtemBilanCrossopterygian
There is no support for that; we can't control what Kafka sends us. ConsumerRecords has no API to get just the records for a single topic - only to get the records for a single partition. In future, don't ask new questions in comments on old answers.Heated
Well, that way it would be better to have separate @KafkaListener for each topic.Swiger
@GaryRussell Is it a good practice to listen to multiple topics on single listener. Lets say 5 topics. Each topic receiving data at a rate of 100 messages per sec(Just an example).Valois
I don't answer new questions in comments on old answers; especially when they are nearly 3 years old. You should ask a new question. That said, your question is difficult to answer; it depends on many things.Heated
@ArtemBilan is there a way I can specify multiple topics using properties value like topics= "${mutlipleTopics}" and define multipleTopics in application.properties file? I tried doing that but getting InvalidTopicExceptionDinny
P
5

If we have to fetch multiple topics from the application.properties file :

@KafkaListener(topics = { "${spring.kafka.topic1}", "${spring.kafka.topic2}" })
Purgatorial answered 3/4, 2022 at 23:25 Comment(0)
D
0

one more,

create bean with method:

@Component
public class TopicsNameResolver {

    private final SomeProps props;

    public String[] topics() {
        props...
    }
}

inject and use SpEL:

@KafkaListener(topics = "#{topicsNameResolver.topics()}")
Devon answered 24/6, 2024 at 9:6 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.