Facing issue in Connecting Kafka 3.0 - org.apache.kafka.common.KafkaException: Failed to load SSL keystore
Asked Answered
B

3

6

I am trying to connect to Kafka 3.0 with SSL but facing issue with loading SSL keystore

I have tried many possible values, but no help

I have tried changing the locations, changing the value of the location, but still that didnt help

package uk.co.argos.services.pas.StepDefinations;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.json.JSONException;


import java.io.IOException;
import java.math.BigInteger;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

public class Test {

    public static List<String> test1 = new ArrayList<>();

    public static List<String> test2 = new ArrayList<>();

    public static String BootStrapServers = "kafka-apps2-1.eu-west-1.dev.deveng.systems:9093,kafka-apps2-2.eu-west-1.dev.deveng.systems:9093,kafka-apps2-3.eu-west-1.dev.deveng.systems:9093";
    public static String iODErrorTopicName = "argos-dev-carrier-preadvice-updates-v1";


    public static Consumer<Long, String> createConsumer(String BOOTSTRAPSERVERS, String Topic) {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAPSERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"KafkaExampleConsumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());



        props.put("security.protocol","SSL");

        props.put("ssl.protocol","SSL");

        props.put("ssl.truststore.location","/kafka.truststore.jks");
        props.put("ssl.truststore.password","changeit");
        props.put("ssl.keystore.location","/kafka.keystore.jks");

        props.put("ssl.keystore.type","JKS");

        props.put("ssl.keystore.password","scdt@best");
        props.put("ssl.key.password","scdtisbest");
        // Create the consumer using props.
        final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
        // Subscribe to the topic.
        consumer.subscribe(Collections.singletonList(Topic));
        return consumer;
    }

    public static void ReadMessageinKafka_iODErrorTopic(String OrderNo) throws ExecutionException, InterruptedException {

        final Consumer<Long, String> consumer = createConsumer(BootStrapServers, iODErrorTopicName);
        final int giveUp = 25;   int noRecordsCount = 0;

        while (true) {
            final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000);
            if (consumerRecords.count()==0) {
                noRecordsCount++;
                if (noRecordsCount > giveUp) break;
                else continue;
            }
            consumerRecords.forEach(record -> {
                System.out.println("Consumer Record:"+record.value());
                if(record.value().contains(OrderNo)){
                    String inValidRecord=record.value();
                    System.out.println("\nFOUND THE MESSAGE");
                    assertNotNull(inValidRecord);

                }
                else{
                    System.out.println("\nMessage didnt found in Kafka");
                    assertEquals("2","3");
                }
            });
            consumer.commitAsync();
        }
        consumer.close();
        System.out.println("Found the Invalid Message in Kafka - iOD Error Topic");

    }

    public static void main(String[] args) throws ParseException, IOException, JSONException, ExecutionException, InterruptedException {

        ReadMessageinKafka_iODErrorTopic("AD106393581");


    }

}

ERROR FACED:

11:33:58.649 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [kafka-apps2-1.eu-west-1.dev.deveng.systems:9093, kafka-apps2-2.eu-west-1.dev.deveng.systems:9093, kafka-apps2-3.eu-west-1.dev.deveng.systems:9093]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = KafkaExampleConsumer
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = SSL
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = [hidden]
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = /kafka.keystore.jks
    ssl.keystore.password = [hidden]
    ssl.keystore.type = JKS
    ssl.protocol = SSL
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = /kafka.truststore.jks
    ssl.truststore.password = [hidden]
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

11:33:58.668 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-1, groupId=KafkaExampleConsumer] Initializing the Kafka consumer
11:33:59.046 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(id = null, nodes = [kafka-apps2-1.eu-west-1.dev.deveng.systems:9093 (id: -1 rack: null), kafka-apps2-3.eu-west-1.dev.deveng.systems:9093 (id: -3 rack: null), kafka-apps2-2.eu-west-1.dev.deveng.systems:9093 (id: -2 rack: null)], partitions = [], controller = null)
11:34:00.990 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-1, groupId=KafkaExampleConsumer] Kafka consumer has been closed
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:799)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:650)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:630)
    at uk.co.argos.services.pas.StepDefinations.Test.createConsumer(Test.java:63)
    at uk.co.argos.services.pas.StepDefinations.Test.ReadMessageinKafka_iODErrorTopic(Test.java:71)
    at uk.co.argos.services.pas.StepDefinations.Test.main(Test.java:103)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /kafka.keystore.jks of type JKS
    at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:64)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140)
    at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:713)
    ... 5 more
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /kafka.keystore.jks of type JKS
    at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:137)
    at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:62)
    ... 9 more
Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /kafka.keystore.jks of type JKS
    at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:330)
    at org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:218)
    at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:135)
    ... 10 more
Caused by: java.io.FileNotFoundException: \kafka.keystore.jks (The system cannot find the file specified)
    at java.io.FileInputStream.open0(Native Method)
    at java.io.FileInputStream.open(FileInputStream.java:195)
    at java.io.FileInputStream.<init>(FileInputStream.java:138)
    at java.io.FileInputStream.<init>(FileInputStream.java:93)
    at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:323)
    ... 12 more

Process finished with exit code 1

Can anyone please help, what could be the issue with the keystore? As it says " Failed to load SSL keystore /kafka.keystore.jks of type JKS"

Birkett answered 7/11, 2019 at 11:42 Comment(1)
kafka 3.0?? kafka.apache.org/downloads website says : 2.4.0 is the latest release. The current stable version is 2.4.0. where did u get 3.0 from?Copycat
V
1

The error seems obvious...

Caused by: java.io.FileNotFoundException: \kafka.keystore.jks (The system cannot find the file specified)

Vampire answered 7/11, 2019 at 13:58 Comment(2)
Yes, where to keep this file in system, and where do we get it?Birkett
You have to create it and add your keys to it, and place it somewhere on the file system and set the property to that location.Vampire
F
4

I had the same issue, unfortunately kafka is not able to read the keystore from the classpath if it is a resource inside your war or jar, I solved the issue reading the resource and storing it to a temporary file passing the absolute path to the kafka configuration.

    @Value("classpath:yourkeystore.jks")
    private Resource keyStore;
           
    public static String saveResourceToTempFile(Resource resource,String outName,String outSuffix) {
            try {
                InputStream source = resource.getInputStream();
                int readBytes;
                File temp = File.createTempFile(outName, outSuffix);
                byte[] buffer = new byte[source.available()];
                source.read(buffer);
                OutputStream outStream = new FileOutputStream(temp);
                outStream.write(buffer);
                temp.deleteOnExit();
                return temp.getAbsolutePath();
            } catch (IOException ioex) {
               ...
            }
    }
    
        ...
        // in the config bean
            props.put(
               SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
               KafkaConfiguration.saveResourceToTempFile(keyStore,"keyStore","jks")
            );
        ...
Fascia answered 26/2, 2020 at 23:37 Comment(3)
how would you add a password to the truststore using this? I'm also loading this from a kafka.properties file (that works but inside it, the truststore is not found.) any workaround?Kimber
the config bean part is a snip... of course you can specify also password and whichever parameter you need as in the original question ... i.e. props.put("ssl.truststore.password","yourpassword"); if instead your question is about adding a password to the truststore , well look in the openssl manual ;)Fascia
Good answer! Though it is easier to copy a file via Files.copy(inputStream, Paths.get(temp.getAbsolutePath()), StandardCopyOption.REPLACE_EXISTING);Elusion
V
1

The error seems obvious...

Caused by: java.io.FileNotFoundException: \kafka.keystore.jks (The system cannot find the file specified)

Vampire answered 7/11, 2019 at 13:58 Comment(2)
Yes, where to keep this file in system, and where do we get it?Birkett
You have to create it and add your keys to it, and place it somewhere on the file system and set the property to that location.Vampire
E
-1

I had the same problem. Turns out that my machine's default java is java 11. It had to be java 8.

The following command will call up the menu to choose default java version:

sudo update-alternatives --config java

Use it to choose Java 8 as default. Then restart deployment and wait a few minutes.

Escallop answered 5/8, 2021 at 1:37 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.