I have an application that may need multiple producers. All code samples I see seem to support a single producer, reading config from app during app startup. If there are multiple producers and we want to pass in different producer config, is there out of the box support in Spring? Or should I just go without spring in that case?
You can create several Producer
instances (KafkaTemplate
) via the same ProducerFactory
.
If you need different Kafka configurations, you’ll need different ProducerFactory
instances.
you will have to create two different ProducerFactory
below is example
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> confluentProducerFactory() {
HashMap<String, Object> configProps = new HashMap<String, Object>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public ProducerFactory<String, String> cloudraProducerFactory() {
HashMap<String, Object> configProps = new HashMap<String, Object>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9094");
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean(name = "confluent")
public KafkaTemplate<String, String> confluentKafkaTemplate() {
return new KafkaTemplate<>(confluentProducerFactory());
}
@Bean(name = "cloudera")
public KafkaTemplate<String, String> clouderaKafkaTemplate() {
return new KafkaTemplate<>(cloudraProducerFactory());
}
}
public class ProducerExample {
@Autowired
@Qualifier("cloudera")
private KafkaTemplate clouderaKafkaTemplate;
@Autowired
@Qualifier("confluent")
private KafkaTemplate confluentKafkaTemplate;
public void send() {
confluentKafkaTemplate.send("TestConfluent", "hey there..confluent");
clouderaKafkaTemplate.send("TestCloudEra","hey there.. cloudera");
}
}
You can create several Producer
instances (KafkaTemplate
) via the same ProducerFactory
.
If you need different Kafka configurations, you’ll need different ProducerFactory
instances.
If you still want to keep your configuration in application.yaml
as usual, and keep Java configuration as minimum as possible, you can extend KafkaProperties.Producer
.
@Configuration
@ConfigurationProperties(prefix = "spring.kafka.producer-1")
@RequiredArgsConstructor
class FirstProducer extends KafkaProperties.Producer {
private final KafkaProperties common;
@Qualifier("producer-1")
@Bean
public ProducerFactory<?, ?> producerFactory() {
final var conf = new HashMap<>(
this.common.buildProducerProperties()
);
conf.putAll(this.buildProperties());
return new DefaultKafkaProducerFactory<>(conf);
}
@Qualifier("producer-1")
@Bean
public KafkaTemplate<?, ?> kafkaTemplate() {
return new KafkaTemplate<>(this.producerFactory());
}
}
@Configuration
@ConfigurationProperties(prefix = "spring.kafka.producer-2")
@RequiredArgsConstructor
class SecondProducer extends KafkaProperties.Producer {
private final KafkaProperties common;
@Qualifier("producer-2")
@Bean
public ProducerFactory<?, ?> producerFactory() {
final var conf = new HashMap<>(
this.common.buildProducerProperties()
);
conf.putAll(this.buildProperties());
return new DefaultKafkaProducerFactory<>(conf);
}
@Qualifier("producer-2")
@Bean
public KafkaTemplate<?, ?> kafkaTemplate() {
return new KafkaTemplate<>(this.producerFactory());
}
}
Starting with version 2.5, you can use a RoutingKafkaTemplate to select the producer at runtime, based on the destination topic name. https://docs.spring.io/spring-kafka/reference/html/#routing-template
Spring boot doesn't provide out of the box support for multiple producer configuration. You can write your own custom kafka configuration to support multiple producer config something like this:-
kafka:
producer:
producer1:
topic: topic1
bootstrap-servers: server1:9092,server1:9093,server1:9094
retries: 0
acks: all
producer2:
topic: topic2
bootstrap-servers: server2:9092,server2:9093,server2:9094
retries: 2
acks: 1
producer3:
...
producer4:
...
Read the configuration from class file:-
@Configuration
@ConfigurationProperties(prefix = "kafka")
@Getter
@Setter
public class KafkaCustomProperties {
private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));
private String clientId;
private Map<String, String> properties = new HashMap<>();
private Map<String, KafkaProperties.Producer> producer;
private Map<String, KafkaProperties.Consumer> consumer;
private KafkaProperties.Ssl ssl = new KafkaProperties.Ssl();
private KafkaProperties.Security security = new KafkaProperties.Security();
public Map<String, Object> buildCommonProperties() {
Map<String, Object> properties = new HashMap<>();
if (this.bootstrapServers != null) {
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
}
if (this.clientId != null) {
properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
}
properties.putAll(this.ssl.buildProperties());
properties.putAll(this.security.buildProperties());
if (!CollectionUtils.isEmpty(this.properties)) {
properties.putAll(this.properties);
}
return properties;
}
}
use this configuration to generate KafkaTemplate
beans for each producer using @Qualifier
annotation
@Configuration
@RequiredArgsConstructor
@Slf4j
public class KafkaMultipleProducerConfig {
private final KafkaCustomProperties kafkaCustomProperties;
@Bean
@Qualifier("producer1")
public KafkaTemplate<String, Object> producer1KafkaTemplate() {
return new KafkaTemplate<>(producerFactory("producer1"));
}
@Bean
@Qualifier("producer2")
public KafkaTemplate<String, Object> producer2KafkaTemplate() {
return new KafkaTemplate<>(producerFactory("producer2"));
}
private ProducerFactory<String, Object> producerFactory(String producerName) {
Map<String, Object> properties = new HashMap<>(kafkaCustomProperties.buildCommonProperties());
if (nonNull(kafkaCustomProperties.getProducer())) {
KafkaProperties.Producer producerProperties = kafkaCustomProperties.getProducer().get(producerName);
if (nonNull(producerProperties)) {
properties.putAll(producerProperties.buildProperties());
}
}
log.info("Kafka Producer '{}' properties: {}", producerName, properties);
return new DefaultKafkaProducerFactory<>(properties);
}
}
and use these KafkaTemplate
beans to publish message to different producer config.
Refer to the post https://codingnconcepts.com/spring-boot/configure-multiple-kafka-producer/ for detailed explanation.
© 2022 - 2025 — McMap. All rights reserved.