Just extending the answer from @i.bondarenko
If you want to configure all the properties from property file then you can write your own custom Kafka configuration to support multiple producer config something like this:-
kafka:
producer:
bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
producer-string:
retries: 0
acks: all
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
producer-byte:
retries: 0
acks: all
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.BytesSerializer
Read the configuration from class file:-
@Configuration
@ConfigurationProperties(prefix = "kafka")
@Getter
@Setter
public class KafkaCustomProperties {
private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));
private String clientId;
private Map<String, String> properties = new HashMap<>();
private Map<String, KafkaProperties.Producer> producer;
private Map<String, KafkaProperties.Consumer> consumer;
private KafkaProperties.Ssl ssl = new KafkaProperties.Ssl();
private KafkaProperties.Security security = new KafkaProperties.Security();
public Map<String, Object> buildCommonProperties() {
Map<String, Object> properties = new HashMap<>();
if (this.bootstrapServers != null) {
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
}
if (this.clientId != null) {
properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
}
properties.putAll(this.ssl.buildProperties());
properties.putAll(this.security.buildProperties());
if (!CollectionUtils.isEmpty(this.properties)) {
properties.putAll(this.properties);
}
return properties;
}
}
use this configuration to generate KafkaTemplate
beans for each producer using @Qualifier
annotation
@Configuration
@RequiredArgsConstructor
@Slf4j
public class KafkaMultipleProducerConfig {
private final KafkaCustomProperties kafkaCustomProperties;
@Bean
@Qualifier("producer-string")
public KafkaTemplate<String, Object> producerStringKafkaTemplate() {
return new KafkaTemplate<>(producerFactory("producer-string"));
}
@Bean
@Qualifier("producer-byte")
public KafkaTemplate<String, Object> producerByteKafkaTemplate() {
return new KafkaTemplate<>(producerFactory("producer-byte"));
}
private ProducerFactory<String, Object> producerFactory(String producerName) {
Map<String, Object> properties = new HashMap<>(kafkaCustomProperties.buildCommonProperties());
if (nonNull(kafkaCustomProperties.getProducer())) {
KafkaProperties.Producer producerProperties = kafkaCustomProperties.getProducer().get(producerName);
if (nonNull(producerProperties)) {
properties.putAll(producerProperties.buildProperties());
}
}
log.info("Kafka Producer '{}' properties: {}", producerName, properties);
return new DefaultKafkaProducerFactory<>(properties);
}
}
and use these KafkaTemplate
beans to publish message to different producer config.
Refer to the post https://codingnconcepts.com/spring-boot/configure-multiple-kafka-producer/ for detailed explanation.