Enabling @KafkaListener to take in variable topic names from application.yml file
Asked Answered
M

3

15

I am attempting to load in multiple topics to a single @KafkaListener but am running into trouble as I believe it is looking for a constant value, but initializing the topics variable from the application.yml file causing something issues, I was wondering if someone could help me troubleshoot this issue, or provide me with direction into how to load multiple Kafka topics into a single KafkaListener.

I am able to listen to multiple topics in the same @KafkaListener by passing them in a comma delimited object as seen below:

@KafkaListener(topics = {
           "flight-events",
           "flight-time-events",
           "service-events",
           "flight-delay-events"
   })

I realize I could do an object with comma delimited values representing the topics, but I want to be able to add topics through a config file, rather than changing code in the code base.

I believe there may be the problem in that @KafkaListener needs to take in a constant value, and I am unable to define an annotation as a constant, is there any way around this?

KafkaWebSocketConnector.java

@Component
public class KafkaWebSocketConnector
{


   @Value("${spring.kafka.topics}")
   private String[] topics;

   @KafkaListener(topics = topics)
   public void listen(ConsumerRecord<?, Map<String, String>> message)
   {
      log.info("Received messages on topic [{}]: [{}]", message.topic(), message.value());
      String dest = "/" + message.topic();
      log.info("destination = {}", dest);
      log.info("msg: {}", message);
      messageTemplate.convertAndSend(dest, message.value());
   }
}

application.yml

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      group-id: kafka-websocket-connector
    topics: flight-events,
      flight-time-events,
      canceled-events,
      pax-events,
      flight-delay-events
Margretmargreta answered 5/7, 2017 at 19:21 Comment(0)
M
28

Answer provided from @Gary Russell from this GitHub issue:

https://github.com/spring-projects/spring-kafka/issues/361

You can use a SpEL expression; there's an example in EnableKafkaIntegrationTests...

@KafkaListener(id = "foo", topics = "#{'${topicOne:annotated1,foo}'.split(',')}")

In my case "#{'${spring.kafka.topics}'.split(',')}"

I was able to implement the above code, (provided by Gary Russell) in order to answer the above question.

Margretmargreta answered 5/7, 2017 at 19:25 Comment(5)
That's called plagiarism... You have not to create GH issue for questions, please.Succulent
Sorry Artem, I know you're a tag team with Gary, is there a better citation that I can apply to this to resolve the issue you have with this answer?Margretmargreta
You have to add the link to that GH issue for more info and accept your answer. In the future just keep in mind that we might not be available on-line to answer questions on GH. That's why SO is the best place to start with.Succulent
Thanks Artem, I understand and hopefully the changes that I made to the answer will alleviate your original issue. As for accepting the answer, SO doesn't allow me to accept my own answer to my own question for 48 hours, but I will be accepting the answer ASAP.Margretmargreta
If you don't need multiple topics Listener the expression might be simpler: @KafkaListener(id = "myListener", topics = "${topics.myTopicName}"Electrician
C
0

Thanks for @terrabl's answer. However there was a minor mistake. We need to use .split(', ') instead of .split(',')

Cottony answered 5/10, 2023 at 18:25 Comment(0)
G
-1

This solution work well :

@KafkaListener(
      topics = "#{propertiesClass.getListOfTopicToListen()}",

In the yaml file

topicsToListen:
        - topic-1
        - topic-2
        - topic-3
Glenoid answered 5/12, 2023 at 13:54 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.