Can we use multiple kafka template in spring boot?
Asked Answered
C

3

5

In my spring boot kafka publisher application, I want to provide support for publishing messages both in String(json) or in byte format, because I want to provide support for both json and avro. But Kafka template in spring boot let us define only either one of the templates. Is there a way to use both templates or any other way to provide support for both json and avro?

KafkaTemplate<String, String> works just for string, but i also want to publish avro which should be something like KafkaTemplate<String, byte[]>

Cimex answered 12/9, 2019 at 10:44 Comment(1)
Um..create a new bean for it? Even if both the beans were of the same type, you could use Qualifier or Primary (annotations). This should not be a problem at all.Storehouse
D
13

You could try to create KafkaTemplate with different configs:

@Bean
public ProducerFactory<String, String> producerFactoryString() {
    Map<String, Object> configProps = new HashMap<>();
    //additional config parameters .... 
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public ProducerFactory<String, byte[]> producerFactoryByte() {
    Map<String, Object> configProps = new HashMap<>();
    //additional config parameters ....
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BytesSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplateString() {
    return new KafkaTemplate<>(producerFactoryString());
}

@Bean
public KafkaTemplate<String, byte[]> kafkaTemplateByte() {
    return new KafkaTemplate<>(producerFactoryByte());
}
Disassemble answered 12/9, 2019 at 10:51 Comment(1)
"Version 2.3 introduced the DelegatingSerializer and DelegatingDeserializer, which allow producing and consuming records with different key and/or value types. "Haustorium
Y
0

You can create Kafka config. I had to send data to 2 different servers.

@Configuration
public class KafkaConfig {

    private final MosaicKafkaConfig mosaicKafkaConfig;
    private final StreamKafkaConfig streamKafkaConfig;

    public KafkaConfig(MosaicKafkaConfig mosaicKafkaConfig, StreamKafkaConfig streamKafkaConfig) {
        this.mosaicKafkaConfig = mosaicKafkaConfig;
        this.streamKafkaConfig = streamKafkaConfig;
    }

    @Bean
    public ProducerFactory<?, ?> kafkaProducerFactoryForMosaic() {
        KafkaProperties kafkaProperties = new KafkaProperties();
        KafkaProperties.Ssl ssl = kafkaProperties.getSsl();
        ResourceLoader resourceLoader = new DefaultResourceLoader();
        Resource resource = resourceLoader.getResource(mosaicKafkaConfig.getSslTrustStoreLocation());
        ssl.setTrustStoreLocation(resource);
        ssl.setTrustStorePassword(mosaicKafkaConfig.getSslTrustStorePassword());
        ssl.setTrustStoreType(mosaicKafkaConfig.getSslTrustStoreType());
        Map<String, String> props = kafkaProperties.getProperties();
        props.put("sasl.jaas.config", mosaicKafkaConfig.getSaslConfig());
        props.put("sasl.mechanism", mosaicKafkaConfig.getSaslMechanism());
        props.put("security.protocol", mosaicKafkaConfig.getSaslSecProtocol());

        kafkaProperties.getProducer().setValueSerializer(mosaicKafkaConfig.getValaueSerializer());
        kafkaProperties.getProducer().setClientId(mosaicKafkaConfig.getClientID());
        kafkaProperties.getProducer().setBootstrapServers(mosaicKafkaConfig.getBootstrapServers());

        Map<String, Object> configProps = kafkaProperties.buildProducerProperties();
        return new DefaultKafkaProducerFactory<>(configProps);

    }

    @Bean
    public KafkaTemplate<?, ?> kafkaTemplateForMosaic(ProducerFactory<Object, Object> kafkaProducerFactoryForMosaic) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactoryForMosaic);
        return kafkaTemplate;
    }


    @Bean
    public ProducerFactory<?, ?> kafkaProducerFactoryForStream() {
        KafkaProperties kafkaProperties = new KafkaProperties();
        KafkaProperties.Ssl ssl = kafkaProperties.getSsl();
        ResourceLoader resourceLoader = new DefaultResourceLoader();
        Resource resource = resourceLoader.getResource(streamKafkaConfig.getSslTrustStoreLocation());
        ssl.setTrustStoreLocation(resource);
        ssl.setTrustStorePassword(streamKafkaConfig.getSslTrustStorePassword());
        ssl.setTrustStoreType(streamKafkaConfig.getSslTrustStoreType());
        Map<String, String> props = kafkaProperties.getProperties();
        props.put("sasl.jaas.config", streamKafkaConfig.getSaslConfig());
        props.put("sasl.mechanism", streamKafkaConfig.getSaslMechanism());
        props.put("security.protocol", streamKafkaConfig.getSaslSecProtocol());

        kafkaProperties.getProducer().setValueSerializer(streamKafkaConfig.getValaueSerializer());
        kafkaProperties.getProducer().setClientId(streamKafkaConfig.getClientID());
        kafkaProperties.getProducer().setBootstrapServers(streamKafkaConfig.getBootstrapServers());

        Map<String, Object> configProps = kafkaProperties.buildProducerProperties();
        return new DefaultKafkaProducerFactory<>(configProps);

    }

    @Bean
    public KafkaTemplate<?, ?> kafkaTemplateForStream(ProducerFactory<Object, Object> kafkaProducerFactoryForStream) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactoryForStream);
        return kafkaTemplate;
    }
}
Yetty answered 3/3, 2021 at 18:12 Comment(2)
can it be done only via application.yml filePermanence
Only one template can be configured via application.yml. As soon as you include a Bean of type KafkaTemplate in your app, the auto-configuration stops working (@ConditionalOnMissingBean(KafkaTemplate.class)) and you'd have to create two Beans (the one that "broke" auto-configuration and the default one)Nellnella
B
0

Just extending the answer from @i.bondarenko

If you want to configure all the properties from property file then you can write your own custom Kafka configuration to support multiple producer config something like this:-

kafka:
  producer:
    bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
    producer-string:
      retries: 0
      acks: all
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    producer-byte:
      retries: 0
      acks: all
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.BytesSerializer

Read the configuration from class file:-

@Configuration
@ConfigurationProperties(prefix = "kafka")
@Getter
@Setter
public class KafkaCustomProperties {
  private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));
  private String clientId;
  private Map<String, String> properties = new HashMap<>();
  private Map<String, KafkaProperties.Producer> producer;
  private Map<String, KafkaProperties.Consumer> consumer;
  private KafkaProperties.Ssl ssl = new KafkaProperties.Ssl();
  private KafkaProperties.Security security = new KafkaProperties.Security();

  public Map<String, Object> buildCommonProperties() {
    Map<String, Object> properties = new HashMap<>();
    if (this.bootstrapServers != null) {
      properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
    }
    if (this.clientId != null) {
      properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
    }
    properties.putAll(this.ssl.buildProperties());
    properties.putAll(this.security.buildProperties());
    if (!CollectionUtils.isEmpty(this.properties)) {
      properties.putAll(this.properties);
    }
    return properties;
  }
}

use this configuration to generate KafkaTemplate beans for each producer using @Qualifier annotation

@Configuration
@RequiredArgsConstructor
@Slf4j
public class KafkaMultipleProducerConfig {

  private final KafkaCustomProperties kafkaCustomProperties;

  @Bean
  @Qualifier("producer-string")
  public KafkaTemplate<String, Object> producerStringKafkaTemplate() {
    return new KafkaTemplate<>(producerFactory("producer-string"));
  }

  @Bean
  @Qualifier("producer-byte")
  public KafkaTemplate<String, Object> producerByteKafkaTemplate() {
    return new KafkaTemplate<>(producerFactory("producer-byte"));
  }

  private ProducerFactory<String, Object> producerFactory(String producerName) {
    Map<String, Object> properties = new HashMap<>(kafkaCustomProperties.buildCommonProperties());
    if (nonNull(kafkaCustomProperties.getProducer())) {
      KafkaProperties.Producer producerProperties = kafkaCustomProperties.getProducer().get(producerName);
      if (nonNull(producerProperties)) {
        properties.putAll(producerProperties.buildProperties());
      }
    }
    log.info("Kafka Producer '{}' properties: {}", producerName, properties);
    return new DefaultKafkaProducerFactory<>(properties);
  }
}

and use these KafkaTemplate beans to publish message to different producer config.

Refer to the post https://codingnconcepts.com/spring-boot/configure-multiple-kafka-producer/ for detailed explanation.

Bradski answered 31/8, 2023 at 1:29 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.