spring-kafka application.properties configuration for JAAS/SASL not working
Asked Answered
F

3

13

Use Case:
I am using Spring Boot 2.2.5.RELEASE and Kafka 2.4.1
JAAS/SASL configurations are done properly on Kafka/ZooKeeper as topics are created without issue with kafka-topics.bat

Issue:
When i start Spring Boot application, i immediately get the following errors:

kafka-server-start.bat console:
INFO [SocketServer brokerId=1] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)

IDE console:
WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=yyy] Bootstrap broker localhost:9093 (id: -3 rack: null) disconnected

My application.properties configuration:

spring.kafka.jaas.enabled=true
spring.kafka.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="spring_bO0t" password="i_am_a_spring_bO0t_user";

kafka_server_jaas.conf:

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="12345"
    user_admin="12345"
    user_spring_bO0t="i_am_a_spring_bO0t_user";
};

Am i missing something?

Thanks in advance.

Folketing answered 24/3, 2020 at 5:21 Comment(0)
F
20

I defined the properties in the wrong place i.e in application.properties. As i have ProducerFactory & ConsumerFactory beans, those application.properties will be ignored by Spring Boot.

Configuring the same properties in the beans definitions resolved the issue, i.e move your properties from application.properties to where you define your beans.

Here's an example:

@Bean
public ProducerFactory<Object, Object> producerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
    props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
    props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
            "%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "username", "password"
    ));
        
    return new DefaultKafkaProducerFactory<>(props);
}

@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
    props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
    props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");

    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
    props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
    props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
        "%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "username", "password"
    ));

    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public KafkaAdmin kafkaAdmin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    
    configs.put("security.protocol", "SASL_PLAINTEXT");
    configs.put("sasl.mechanism", "PLAIN");
    configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " + 
                                    "username=username" + 
                                    "password=password;");
    
    return new KafkaAdmin(configs);
}
Folketing answered 24/3, 2020 at 6:35 Comment(2)
Can also try with ScramLoginModule.class instead of PlainLoginModule.class. If still facing authentication error. As could be in your Kafka ScramLoginModule is configured.Bonnee
"As i have ProducerFactory & ConsumerFactory beans, those application.properties will be ignored by Spring Boot"...Wonder if you found any reference to this in the docs which says the properties will be ignored? But it was helpful. ThanksCollard
D
9

The answer provided by @jumping_monkey is correct, however I didn't know where to put those configurations in ProducerFactory & ConsumerFactory beans, so I'll leave an example below for those who want to know:

-In your ProducerConfig or ConsumerConfig Beans respectively (Mine is named generalMessageProducerFactory):

@Bean
public ProducerFactory<String, GeneralMessageDto> generalMessageProducerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    configProps.put("sasl.mechanism", "PLAIN");
    configProps.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule   required username='YOUR_KAFKA_CLUSTER_USERNAME'   password='YOUR_KAFKA_CLUSTER_PASSWORD';");
    configProps.put("security.protocol", "SASL_SSL");
    return new DefaultKafkaProducerFactory<>(configProps);
}

And also in your TopicConfiguration Class in kafkaAdmin method:

@Bean
public KafkaAdmin kafkaAdmin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configs.put("sasl.mechanism", "PLAIN");
    configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule   required username='YOUR_KAFKA_CLUSTER_USERNAME'   password='YOUR_KAFKA_CLUSTER_PASSWORD';");
    configs.put("security.protocol", "SASL_SSL");
    return new KafkaAdmin(configs);
}

Hope this was helpful guys!

Ductile answered 21/10, 2020 at 4:50 Comment(0)
M
8

If you use spring you can add 3 properties to your yaml file.

  kafka:
    bootstrap-servers: ${KAFKA_URL:localhost:9092}
    properties:
      security.protocol: SASL_PLAINTEXT
      sasl.mechanism: PLAIN
      sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="${KAFKA_USER:user}" password="${KAFKA_PASSWORD:pass}";
Mckelvey answered 24/10, 2023 at 6:52 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.