How to pass topics dynamically to a kafka listener?
Asked Answered
A

6

11

From a couple of days I'm trying out ways to dynamically pass topics to Kafka listener rather than using them through keys from a Java DSL. Anyone around done this before or could throw some light on what is the best way to achieve this?

Antacid answered 25/9, 2017 at 8:6 Comment(0)
B
3

You cannot "dynamically pass topics to Kafka listener "; you have to programmatically create a listener container instead.

Bankroll answered 26/9, 2017 at 13:27 Comment(15)
Thank you, I had the same opinion. However, It would be very helpful if you could post a code snippet on how to do it.Antacid
Just create a new KafkaMessageListenerContainer (or ConcurrentMessageListenerContainer) and start() it.Bankroll
Start where? My consumer now is a POJO listener with @KafkaListener, There's a ConsumerConfig file where i write my consumer configs. Same file has containerFactory and a container, But KafkaListener annotation doesn't accept the container as a parameter. Where do i use this container object and start it?Antacid
You can't create @KafkaListener programmatically; just the underlying listener container with a regular MessageListener. You could create a @KafkaListener programmatically, if you load it into a new ApplicationContext.Bankroll
Yes, I will create a MessageListener programmatically in my consumer config file but how would the application know that this is the consumer listening to the topics? Is there a code posted for this somewhere that i can refer?Antacid
Sorry; I don't understand the question; to consume from a new topic; simply create a new container, configure it and call start().Bankroll
Creation and Config of container: @Bean public KafkaMessageListenerContainer<String, String> container() throws Exception { ContainerProperties properties = new ContainerProperties(this.topics); // set more properties properties.setPauseEnabled(true); properties.setPauseAfter(0); return new KafkaMessageListenerContainer<>(consumerFactory(), properties); }Antacid
Starting of container: @Bean public void startContainer(){KafkaMessageListenerContainer<String, String> container = container(); container.start(); } Am i right?Antacid
Don't put code in comments; it's hard to read; edit the question instead. Yes, it looks correct.Bankroll
Sure, will do! Now i assume that the listener has started, How do i access the record posted by the producer onto the queue using this container? Similar to listener(ConsumerRecord<?,?> record) usinf @KafkaListener?Antacid
containerProperties.setMessageListener(myListener). Where myListener implements one of the listener interfaces.Bankroll
Thanks a ton @Gary, It worked like a charm. Please bear with me for some more time and have a look at this one too: #46447619Antacid
In the above discussed implementation, if I use ConcurrentMessageListenerContainer, is there any possible way to consume messages in a batch?Awaken
Don't ask new questions in comments; especially when the answer is 4 years old; the framework has moved on since then. Yes, setBatchListener(true) on the container factory. If you need more help, ask a new question.Bankroll
If you are creating the container yourself, just inject a BatchMessageListener; if you are using a container factory to create the container, setBatchListener(true) on the factory so that it will use the correct adapter.Bankroll
B
16

The easiest solution I found was to use SpEL:

@Autowired
private SomeBean kafkaTopicNameProvider;

@KafkaListener(topics = "#{kafkaTopicNameProvider.provideName()}")
public void listener() { ... }
Brooklyn answered 12/8, 2019 at 13:47 Comment(3)
thanks. Alternatively topics may come via application.yml prop: @KafkaListener(topics = {"${someprop}"})Flem
What is more, kafkaTopicNameProvider.provideName() can also return a String[] and allow to configure multiple topics using a single SpELSewellel
if your class name is "SomeBean" and want to access it in SpEL, "#{someBean.field}". note its lower case.Chippy
A
4

Here is a working solution:

// Start brokers without using the "@KafkaListener" annotation
Map<String, Object> consumerProps = consumerProps("my-srv1:9092", "my-group", "false");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
ContainerProperties containerProperties = new ContainerProperties("my-topic");
KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProperties);
final BlockingQueue<ConsumerRecord<String, String>> records = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<String, String>) record -> {
    log.error("Message received: " + record);
    records.add(record);
});
container.start();

/**
 * Set up test properties for an {@code <Integer, String>} consumer.
 * @param brokersCommaSep the bootstrapServers property (comma separated servers).
 * @param group the group id.
 * @param autoCommit the auto commit.
 * @return the properties.
 */
public static Map<String, Object> consumerProps(String brokersCommaSep, String group, String autoCommit) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersCommaSep);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return props;
}

Hope it can help.

Apical answered 7/8, 2018 at 14:39 Comment(0)
B
3

You cannot "dynamically pass topics to Kafka listener "; you have to programmatically create a listener container instead.

Bankroll answered 26/9, 2017 at 13:27 Comment(15)
Thank you, I had the same opinion. However, It would be very helpful if you could post a code snippet on how to do it.Antacid
Just create a new KafkaMessageListenerContainer (or ConcurrentMessageListenerContainer) and start() it.Bankroll
Start where? My consumer now is a POJO listener with @KafkaListener, There's a ConsumerConfig file where i write my consumer configs. Same file has containerFactory and a container, But KafkaListener annotation doesn't accept the container as a parameter. Where do i use this container object and start it?Antacid
You can't create @KafkaListener programmatically; just the underlying listener container with a regular MessageListener. You could create a @KafkaListener programmatically, if you load it into a new ApplicationContext.Bankroll
Yes, I will create a MessageListener programmatically in my consumer config file but how would the application know that this is the consumer listening to the topics? Is there a code posted for this somewhere that i can refer?Antacid
Sorry; I don't understand the question; to consume from a new topic; simply create a new container, configure it and call start().Bankroll
Creation and Config of container: @Bean public KafkaMessageListenerContainer<String, String> container() throws Exception { ContainerProperties properties = new ContainerProperties(this.topics); // set more properties properties.setPauseEnabled(true); properties.setPauseAfter(0); return new KafkaMessageListenerContainer<>(consumerFactory(), properties); }Antacid
Starting of container: @Bean public void startContainer(){KafkaMessageListenerContainer<String, String> container = container(); container.start(); } Am i right?Antacid
Don't put code in comments; it's hard to read; edit the question instead. Yes, it looks correct.Bankroll
Sure, will do! Now i assume that the listener has started, How do i access the record posted by the producer onto the queue using this container? Similar to listener(ConsumerRecord<?,?> record) usinf @KafkaListener?Antacid
containerProperties.setMessageListener(myListener). Where myListener implements one of the listener interfaces.Bankroll
Thanks a ton @Gary, It worked like a charm. Please bear with me for some more time and have a look at this one too: #46447619Antacid
In the above discussed implementation, if I use ConcurrentMessageListenerContainer, is there any possible way to consume messages in a batch?Awaken
Don't ask new questions in comments; especially when the answer is 4 years old; the framework has moved on since then. Yes, setBatchListener(true) on the container factory. If you need more help, ask a new question.Bankroll
If you are creating the container yourself, just inject a BatchMessageListener; if you are using a container factory to create the container, setBatchListener(true) on the factory so that it will use the correct adapter.Bankroll
P
3

I made kafka listener for runtime registration, de-registration, start, stop.

public class KafkaListener {

     private final KafkaListenerContainerFactory kafkaListenerContainerFactory;
     private final Map<String, MessageListenerContainer> registeredTopicMap;

/** Kafka listener registration at runtime.**/
public void register(final Supplier<Set<String>> topicSupplier, final Supplier<MessageListener> messageListenerSupplier) {

    synchronized (lock) {
        final Set<String> registeredTopics = getRegisteredTopics();
        final Set<String> topics = topicSupplier.get();

        if (topics.isEmpty()) {
            return;
        }

        topics.stream()
                .filter(topic -> !registeredTopics.contains(topic))
                .forEach(topic -> doRegister(topic, messageListenerSupplier.get()));
    }
}


private void doRegister(final String topic, final MessageListener messageListener) {
    final MessageListenerContainer messageListenerContainer = kafkaListenerContainerFactory.createContainer(topic);
    messageListenerContainer.setupMessageListener(messageListener);
    messageListenerContainer.start();

    registeredTopicMap.put(topic, messageListenerContainer);
}

Full source code : https://github.com/pkgonan/kafka-listener

First, try it.

docker-compose up -d

And then. call api.

curl -XPOST /consumers/order/register .....

curl -XPOST /consumers/order/de-register .....

curl -XPOST /consumers/order/stop

curl -XPOST /consumers/order/start
Perfidy answered 9/9, 2019 at 14:49 Comment(0)
Z
1

you can change Topics at runtime dynamicly!!!!

@Component
public class StoppingErrorHandler implements ErrorHandler {

    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
        ConcurrentMessageListenerContainer listenerContainer = (ConcurrentMessageListenerContainer)kafkaListenerEndpointRegistry.getListenerContainer("fence");
        ContainerProperties cp=listenerContainer.getContainerProperties();
        String[] topics =cp.getTopics();
        topics[0]="gaonb";
        listenerContainer.stop();
        listenerContainer.start();
    }
}
Zenas answered 1/12, 2017 at 5:15 Comment(1)
Now it doesn't work: getTopics() returns a copy of the topics arrayRamburt
C
0

In case anyone looking to build a module which can be used as a library, do checkout https://github.com/313hemant313/TheGameKafka

Based on clientName we can get KafkaProducer and KafkaConsumer, sample usage:

TheGameKafkaProducer<String, String> theGameKafkaProducer = theGameKafkaProducerFactory.getTheGameKafkaProducer(
    "testClient");
theGameKafkaProducer.send("this is a test msg from TheGameKafkaProducer");

TheGameKafkaConsumer<String, String> theGameKafkaConsumer = theGameKafkaConsumerFactory.getTheGameKafkaConsumer(
    "testClient");
theGameKafkaConsumer.listen(messageListener());

private MessageListener<String, String> messageListener() {
  return rec -> log.info("TheGameKafkaConsumer listened : {}", rec);
}

Sample config:

the-game-kafka:
  clientConsumerProperties:
    - clientName: "testClient"
      topic: "testTopic"
      enabled: true
      kafkaProperties:
        consumer:
          bootstrapServers: localhost:9094
          groupId: "tgk-group"
          autoOffset: earliest
          keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
          properties:
            spring:
              json:
                trusted:
                  packages: "*"
              deserializer:
                value:
                  delegate:
                    class: org.apache.kafka.common.serialization.StringDeserializer
  clientProducerProperties:
    - clientName: "testClient"
      topic: "testTopic"
      enabled: true
      kafkaProperties:
        producer:
          bootstrapServers: localhost:9094

Inspired by https://github.com/pkgonan/kafka-listener.

Consolidation answered 26/8, 2023 at 17:28 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.