Is there a code sample for multiple producers in spring kafka?
Asked Answered
O

5

15

I have an application that may need multiple producers. All code samples I see seem to support a single producer, reading config from app during app startup. If there are multiple producers and we want to pass in different producer config, is there out of the box support in Spring? Or should I just go without spring in that case?

Oology answered 24/2, 2018 at 3:42 Comment(0)
L
12

You can create several Producer instances (KafkaTemplate) via the same ProducerFactory.

If you need different Kafka configurations, you’ll need different ProducerFactory instances.

Lukas answered 24/2, 2018 at 3:52 Comment(0)
G
24

you will have to create two different ProducerFactory below is example

    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;

    import java.util.HashMap;

    @Configuration
    public class KafkaProducerConfig {


        @Bean
        public ProducerFactory<String, String> confluentProducerFactory() {

            HashMap<String, Object> configProps = new HashMap<String, Object>();
            configProps.put(
                    ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                    "localhost:9092");
            configProps.put(
                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class);
            configProps.put(
                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class);
            return new DefaultKafkaProducerFactory<>(configProps);
        }


        @Bean
        public ProducerFactory<String, String> cloudraProducerFactory() {

            HashMap<String, Object> configProps = new HashMap<String, Object>();
            configProps.put(
                    ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                    "localhost:9094");
            configProps.put(
                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class);
            configProps.put(
                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class);
            return new DefaultKafkaProducerFactory<>(configProps);
        }

        @Bean(name = "confluent")
        public KafkaTemplate<String, String> confluentKafkaTemplate() {
            return new KafkaTemplate<>(confluentProducerFactory());
        }

        @Bean(name = "cloudera")
        public KafkaTemplate<String, String> clouderaKafkaTemplate() {
            return new KafkaTemplate<>(cloudraProducerFactory());
        }

    }




public class ProducerExample {

    @Autowired
    @Qualifier("cloudera")
    private KafkaTemplate clouderaKafkaTemplate;


    @Autowired
    @Qualifier("confluent")
    private KafkaTemplate confluentKafkaTemplate;

    public void send() {
        confluentKafkaTemplate.send("TestConfluent", "hey there..confluent");
        clouderaKafkaTemplate.send("TestCloudEra","hey there.. cloudera");
    }

}
Gantt answered 23/4, 2019 at 9:18 Comment(3)
and multiple consumer ?Henriquez
Can different producer factories(ie producer kafka config) be defined in application properties?Chumash
@Chumash Yes, see my answer below.Eustatius
L
12

You can create several Producer instances (KafkaTemplate) via the same ProducerFactory.

If you need different Kafka configurations, you’ll need different ProducerFactory instances.

Lukas answered 24/2, 2018 at 3:52 Comment(0)
E
8

If you still want to keep your configuration in application.yaml as usual, and keep Java configuration as minimum as possible, you can extend KafkaProperties.Producer.


@Configuration
@ConfigurationProperties(prefix = "spring.kafka.producer-1")
@RequiredArgsConstructor
class FirstProducer extends KafkaProperties.Producer {
    private final KafkaProperties common;

    @Qualifier("producer-1")
    @Bean
    public ProducerFactory<?, ?> producerFactory() {
        final var conf = new HashMap<>(
            this.common.buildProducerProperties()
        );
        conf.putAll(this.buildProperties());
        return new DefaultKafkaProducerFactory<>(conf);

    }

    @Qualifier("producer-1")
    @Bean
    public KafkaTemplate<?, ?> kafkaTemplate() {
        return new KafkaTemplate<>(this.producerFactory());

    }
}

@Configuration
@ConfigurationProperties(prefix = "spring.kafka.producer-2")
@RequiredArgsConstructor
class SecondProducer extends KafkaProperties.Producer {
    private final KafkaProperties common;

    @Qualifier("producer-2")
    @Bean
    public ProducerFactory<?, ?> producerFactory() {
        final var conf = new HashMap<>(
            this.common.buildProducerProperties()
        );
        conf.putAll(this.buildProperties());
        return new DefaultKafkaProducerFactory<>(conf);

    }

    @Qualifier("producer-2")
    @Bean
    public KafkaTemplate<?, ?> kafkaTemplate() {
        return new KafkaTemplate<>(this.producerFactory());

    }
}

Eustatius answered 21/12, 2021 at 13:8 Comment(2)
That looks nice! However, would there exist a way to do away with all of the boilerplate code? The only difference I can spot is the name of the properties prefix.Batter
You can extract shared parts to a superclass, here code is duplicated just as an illustationEustatius
C
4

Starting with version 2.5, you can use a RoutingKafkaTemplate to select the producer at runtime, based on the destination topic name. https://docs.spring.io/spring-kafka/reference/html/#routing-template

Calculator answered 19/9, 2021 at 8:46 Comment(0)
M
1

Spring boot doesn't provide out of the box support for multiple producer configuration. You can write your own custom kafka configuration to support multiple producer config something like this:-

kafka:
  producer:
    producer1:
      topic: topic1
      bootstrap-servers: server1:9092,server1:9093,server1:9094
      retries: 0
      acks: all
    producer2:
      topic: topic2
      bootstrap-servers: server2:9092,server2:9093,server2:9094
      retries: 2
      acks: 1
    producer3:
      ...
    producer4:
      ...

Read the configuration from class file:-

@Configuration
@ConfigurationProperties(prefix = "kafka")
@Getter
@Setter
public class KafkaCustomProperties {
  private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));
  private String clientId;
  private Map<String, String> properties = new HashMap<>();
  private Map<String, KafkaProperties.Producer> producer;
  private Map<String, KafkaProperties.Consumer> consumer;
  private KafkaProperties.Ssl ssl = new KafkaProperties.Ssl();
  private KafkaProperties.Security security = new KafkaProperties.Security();

  public Map<String, Object> buildCommonProperties() {
    Map<String, Object> properties = new HashMap<>();
    if (this.bootstrapServers != null) {
      properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
    }
    if (this.clientId != null) {
      properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
    }
    properties.putAll(this.ssl.buildProperties());
    properties.putAll(this.security.buildProperties());
    if (!CollectionUtils.isEmpty(this.properties)) {
      properties.putAll(this.properties);
    }
    return properties;
  }
}

use this configuration to generate KafkaTemplate beans for each producer using @Qualifier annotation

@Configuration
@RequiredArgsConstructor
@Slf4j
public class KafkaMultipleProducerConfig {

  private final KafkaCustomProperties kafkaCustomProperties;

  @Bean
  @Qualifier("producer1")
  public KafkaTemplate<String, Object> producer1KafkaTemplate() {
    return new KafkaTemplate<>(producerFactory("producer1"));
  }

  @Bean
  @Qualifier("producer2")
  public KafkaTemplate<String, Object> producer2KafkaTemplate() {
    return new KafkaTemplate<>(producerFactory("producer2"));
  }

  private ProducerFactory<String, Object> producerFactory(String producerName) {
    Map<String, Object> properties = new HashMap<>(kafkaCustomProperties.buildCommonProperties());
    if (nonNull(kafkaCustomProperties.getProducer())) {
      KafkaProperties.Producer producerProperties = kafkaCustomProperties.getProducer().get(producerName);
      if (nonNull(producerProperties)) {
        properties.putAll(producerProperties.buildProperties());
      }
    }
    log.info("Kafka Producer '{}' properties: {}", producerName, properties);
    return new DefaultKafkaProducerFactory<>(properties);
  }
}

and use these KafkaTemplate beans to publish message to different producer config.

Refer to the post https://codingnconcepts.com/spring-boot/configure-multiple-kafka-producer/ for detailed explanation.

Mechanistic answered 31/8, 2023 at 0:37 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.