From a couple of days I'm trying out ways to dynamically pass topics to Kafka listener rather than using them through keys from a Java DSL. Anyone around done this before or could throw some light on what is the best way to achieve this?
You cannot "dynamically pass topics to Kafka listener "; you have to programmatically create a listener container instead.
KafkaMessageListenerContainer
(or ConcurrentMessageListenerContainer
) and start()
it. –
Bankroll @KafkaListener
programmatically; just the underlying listener container with a regular MessageListener
. You could create a @KafkaListener
programmatically, if you load it into a new ApplicationContext
. –
Bankroll start()
. –
Bankroll containerProperties.setMessageListener(myListener)
. Where myListener
implements one of the listener interfaces. –
Bankroll setBatchListener(true)
on the container factory. If you need more help, ask a new question. –
Bankroll The easiest solution I found was to use SpEL:
@Autowired
private SomeBean kafkaTopicNameProvider;
@KafkaListener(topics = "#{kafkaTopicNameProvider.provideName()}")
public void listener() { ... }
@KafkaListener(topics = {"${someprop}"})
–
Flem kafkaTopicNameProvider.provideName()
can also return a String[]
and allow to configure multiple topics using a single SpEL
–
Sewellel Here is a working solution:
// Start brokers without using the "@KafkaListener" annotation
Map<String, Object> consumerProps = consumerProps("my-srv1:9092", "my-group", "false");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
ContainerProperties containerProperties = new ContainerProperties("my-topic");
KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProperties);
final BlockingQueue<ConsumerRecord<String, String>> records = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<String, String>) record -> {
log.error("Message received: " + record);
records.add(record);
});
container.start();
/**
* Set up test properties for an {@code <Integer, String>} consumer.
* @param brokersCommaSep the bootstrapServers property (comma separated servers).
* @param group the group id.
* @param autoCommit the auto commit.
* @return the properties.
*/
public static Map<String, Object> consumerProps(String brokersCommaSep, String group, String autoCommit) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersCommaSep);
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
Hope it can help.
You cannot "dynamically pass topics to Kafka listener "; you have to programmatically create a listener container instead.
KafkaMessageListenerContainer
(or ConcurrentMessageListenerContainer
) and start()
it. –
Bankroll @KafkaListener
programmatically; just the underlying listener container with a regular MessageListener
. You could create a @KafkaListener
programmatically, if you load it into a new ApplicationContext
. –
Bankroll start()
. –
Bankroll containerProperties.setMessageListener(myListener)
. Where myListener
implements one of the listener interfaces. –
Bankroll setBatchListener(true)
on the container factory. If you need more help, ask a new question. –
Bankroll I made kafka listener for runtime registration, de-registration, start, stop.
public class KafkaListener {
private final KafkaListenerContainerFactory kafkaListenerContainerFactory;
private final Map<String, MessageListenerContainer> registeredTopicMap;
/** Kafka listener registration at runtime.**/
public void register(final Supplier<Set<String>> topicSupplier, final Supplier<MessageListener> messageListenerSupplier) {
synchronized (lock) {
final Set<String> registeredTopics = getRegisteredTopics();
final Set<String> topics = topicSupplier.get();
if (topics.isEmpty()) {
return;
}
topics.stream()
.filter(topic -> !registeredTopics.contains(topic))
.forEach(topic -> doRegister(topic, messageListenerSupplier.get()));
}
}
private void doRegister(final String topic, final MessageListener messageListener) {
final MessageListenerContainer messageListenerContainer = kafkaListenerContainerFactory.createContainer(topic);
messageListenerContainer.setupMessageListener(messageListener);
messageListenerContainer.start();
registeredTopicMap.put(topic, messageListenerContainer);
}
Full source code : https://github.com/pkgonan/kafka-listener
First, try it.
docker-compose up -d
And then. call api.
curl -XPOST /consumers/order/register .....
curl -XPOST /consumers/order/de-register .....
curl -XPOST /consumers/order/stop
curl -XPOST /consumers/order/start
you can change Topics at runtime dynamicly!!!!
@Component
public class StoppingErrorHandler implements ErrorHandler {
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
ConcurrentMessageListenerContainer listenerContainer = (ConcurrentMessageListenerContainer)kafkaListenerEndpointRegistry.getListenerContainer("fence");
ContainerProperties cp=listenerContainer.getContainerProperties();
String[] topics =cp.getTopics();
topics[0]="gaonb";
listenerContainer.stop();
listenerContainer.start();
}
}
In case anyone looking to build a module which can be used as a library, do checkout https://github.com/313hemant313/TheGameKafka
Based on clientName we can get KafkaProducer and KafkaConsumer, sample usage:
TheGameKafkaProducer<String, String> theGameKafkaProducer = theGameKafkaProducerFactory.getTheGameKafkaProducer(
"testClient");
theGameKafkaProducer.send("this is a test msg from TheGameKafkaProducer");
TheGameKafkaConsumer<String, String> theGameKafkaConsumer = theGameKafkaConsumerFactory.getTheGameKafkaConsumer(
"testClient");
theGameKafkaConsumer.listen(messageListener());
private MessageListener<String, String> messageListener() {
return rec -> log.info("TheGameKafkaConsumer listened : {}", rec);
}
Sample config:
the-game-kafka:
clientConsumerProperties:
- clientName: "testClient"
topic: "testTopic"
enabled: true
kafkaProperties:
consumer:
bootstrapServers: localhost:9094
groupId: "tgk-group"
autoOffset: earliest
keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring:
json:
trusted:
packages: "*"
deserializer:
value:
delegate:
class: org.apache.kafka.common.serialization.StringDeserializer
clientProducerProperties:
- clientName: "testClient"
topic: "testTopic"
enabled: true
kafkaProperties:
producer:
bootstrapServers: localhost:9094
Inspired by https://github.com/pkgonan/kafka-listener.
© 2022 - 2025 — McMap. All rights reserved.