How to configure kafka consumer with sasl mechanism PLAIN and with security protocol SASL_SSL in java?
Asked Answered
C

1

8

I want to create kafka consumer which is using security protocol SASL_SSL and sasl merchanism PLAIN. Can someone help me configure these details?

I have read many documents on how to configure SASL details but still didnt get clear picture on how to do that. Here i am attaching the code i used to create kafka consumer

Properties props = new Properties();
props.put("bootstrap.servers", "servers");
String consumeGroup = "consumer_group";
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\"");
props.put("group.id", consumeGroup);
props.put("client.id", "client_id");
props.put("security.protocol", "SASL_SSL");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "101");
props.put("max.partition.fetch.bytes", "135");
// props.put("auto.offset.reset", "earliest");
props.put("heartbeat.interval.ms", "3000");
props.put("session.timeout.ms", "6001");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",      "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<String, String>(props);

Stacktrace

    14:56:12.767 [main] DEBUG o.a.k.clients.consumer.KafkaConsumer - Starting the Kafka consumer
    14:56:12.776 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(nodes = [Node(-2, kafka-events-nonprod-ds1.i, 9092), Node(-3, kafka-events-nonprod-ds1-3.io, 9092), Node(-1, kafka-events-nonprod-ds1-1.io, 9092)], partitions = [])
    14:56:12.789 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name connections-closed:client-id-client_id
    14:56:12.845 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name connections-created:client-id-client_id
    14:56:12.846 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name bytes-sent-received:client-id-client_id
    14:56:12.846 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name bytes-sent:client-id-client_id
    14:56:12.847 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name bytes-received:client-id-client_id
    14:56:12.847 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name select-time:client-id-client_id
    14:56:12.847 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name io-time:client-id-client_id
    14:56:12.861 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name heartbeat-latency
    14:56:12.862 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name join-latency
    14:56:12.862 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name sync-latency
    14:56:12.865 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name commit-latency
    14:56:12.873 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name bytes-fetched
    14:56:12.874 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name records-fetched
    14:56:12.879 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name fetch-latency
    14:56:12.881 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name records-lag
    14:56:12.882 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name fetch-throttle-time
    14:56:12.883 [main] WARN  o.a.k.c.consumer.ConsumerConfig - The configuration sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password" was supplied but isn't a known config.
    14:56:12.885 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.0
    14:56:12.885 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a
    14:56:12.886 [main] DEBUG o.a.k.clients.consumer.KafkaConsumer - Kafka consumer created
    14:56:12.887 [main] DEBUG o.a.k.clients.consumer.KafkaConsumer - Subscribed to topic(s): topic_name
    14:56:12.887 [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Issuing group metadata request to broker -2
    14:56:12.918 [main] DEBUG o.apache.kafka.clients.NetworkClient - Initiating connection to node -2 at kafka-events-nonprod-ds1.i:9092.
    14:56:13.336 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node--2.bytes-sent
    14:56:13.336 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node--2.bytes-received
    14:56:13.337 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node--2.latency
    14:56:13.339 [main] DEBUG o.apache.kafka.clients.NetworkClient - Completed connection to node -2
    14:56:13.343 [main] DEBUG o.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=1,client_id=client_id}, body={topics=[topic_name]}), isInitiatedByNetworkClient, createdTimeMs=1568193973342, sendTimeMs=0) to node -2
    14:56:13.986 [main] DEBUG o.a.kafka.common.network.Selector - Connection with kafka-events-nonprod-ds1-2.octanner.i/10.84.20.85 disconnected
    java.io.EOFException: null
        at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99) ~[kafka-clients-0.9.0.0.jar:na]
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) ~[kafka-clients-0.9.0.0.jar:na]
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:160) ~[kafka-clients-0.9.0.0.jar:na]
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:141) ~[kafka-clients-0.9.0.0.jar:na]
        at org.apache.kafka.common.network.Selector.poll(Selector.java:286) ~[kafka-clients-0.9.0.0.jar:na]
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270) [kafka-clients-0.9.0.0.jar:na]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303) [kafka-clients-0.9.0.0.jar:na]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197) [kafka-clients-0.9.0.0.jar:na]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187) [kafka-clients-0.9.0.0.jar:na]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:126) [kafka-clients-0.9.0.0.jar:na]
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:186) [kafka-clients-0.9.0.0.jar:na]
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:857) [kafka-clients-0.9.0.0.jar:na]
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:829) [kafka-clients-0.9.0.0.jar:na]
        at kafka.Consumer.processRecords(Consumer.java:54) [classes/:na]
        at kafka.Consumer.execute(Consumer.java:22) [classes/:na]
        at kafka.Consumer.main(Consumer.java:15) [classes/:na]

Deserialize function:

private static void processRecords(KafkaConsumer<String, Object> consumer) throws InterruptedException {
    while (true) {
        ConsumerRecords<String, Object> records = consumer.poll(TimeUnit.MINUTES.toMillis(1));
        long lastOffset = 0;
        for (ConsumerRecord<String, Object> record : records) {
            System.out.printf("\n\n\n\n\n\n\roffset = %d, key = %s\n\n\n\n\n\n", record.offset(), record.value());
            lastOffset = record.offset();
        }
        System.out.println("lastOffset read: " + lastOffset);
        process();
    }
}
Croaker answered 11/9, 2019 at 9:34 Comment(0)
E
10

Support for the Plain mechanism was added in Kafka 0.10. Kafka 0.9, the version you are using, only supported the GSSAPI mechanism.

Once you've switched to a more recent version, you just need to set at least the following configurations:

Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, <BROKERS>);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";");

Note that SaslConfigs.SASL_JAAS_CONFIG support was added in Kafka 0.10.2. Prior to that you need to use a JAAS file. See Kafka "Login module not specified in JAAS config" for details.

I recommend you start using the latest Kafka version if possible.

Excommunicate answered 11/9, 2019 at 11:15 Comment(8)
I have implemented as you said. but i am getting subscribed message as offset = 3722972, key = ����0����Ȧ�Ċ7"��JeraldHughson�� KelleJarrette��/�ٿ��[���2Search Icon 1000 1 Apprvr��{ 1568238816678245N2T_NOMINATIONNOMINEE_EVENT�鿐�[*Customer Service RepsUnited StatesValue 2 can you help me with this?Croaker
i have updated the question with deserialize method. Can someone help me?Croaker
That's a completely different question. Please ask a new questionExcommunicate
Sute, Thanks will do thatCroaker
Hi @MickaelMaison, Were you able to fix the below. Can you please post what was done: ------------ I have implemented as you said. but i am getting subscribed message as offset = 3722972, key = ����0����Ȧ�Ċ7"��JeraldHughson�� KelleJarrette��/�ٿ��[���2Search Icon 1000 1 Apprvr��{ 1568238816678245N2T_NOMINATIONNOMINEE_EVENT�鿐�[*Customer Service RepsUnited StatesValue 2 can you help me with this? ------------Stines
How did you find out about classes like SaslConfigs or CommonClientConfigs. I dont see any javadoc or documentation for these. Like for example, where can I find "key.serializer"?Giselagiselbert
Never mind had to look at actual source code. Its present in ProducerConfig class. But very bad documentation(since its impossible to find unless one knows class name) unlike other languages/frameworks like go, node, react etc. Poor documentation discoverability for kafka apis.Giselagiselbert
If you have ideas how to improve the docs, feel free to file a ticket or open a PR.Excommunicate

© 2022 - 2024 — McMap. All rights reserved.