i've setup SSL on my local Kafka instance, and when i start the Kafka console producer/consumer on SSL port, it is giving SSL Handshake error
Karans-MacBook-Pro:keystore karanalang$ $CONFLUENT_HOME/bin/kafka-console-producer --broker-list localhost:9093 --topic karantest --producer.config $CONFLUENT_HOME/props/client-ssl.properties
>[2021-11-10 13:15:09,824] ERROR [Producer clientId=console-producer] Connection to node -1 (localhost/127.0.0.1:9093) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
[2021-11-10 13:15:09,826] WARN [Producer clientId=console-producer] Bootstrap broker localhost:9093 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-11-10 13:15:10,018] ERROR [Producer clientId=console-producer] Connection to node -1 (localhost/127.0.0.1:9093) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
[2021-11-10 13:15:10,019] WARN [Producer clientId=console-producer] Bootstrap broker localhost:9093 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-11-10 13:15:10,195] ERROR [Producer clientId=console-producer] Connection to node -1 (localhost/127.0.0.1:9093) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
Here are the changes made :
- create the truststore & keystore
Here is output of the openssl command to check the SSL connectivity :
Karans-MacBook-Pro:keystore karanalang$ openssl s_client -debug -connect localhost:9093 -tls1
CONNECTED(00000005)
write to 0x13d7bdf90 [0x13e01ea03] (118 bytes => 118 (0x76))
0000 - 16 03 01 00 71 01 00 00-6d 03 01 81 e8 00 cd c4 ....q...m.......
0010 - 04 4b 64 86 3e 30 97 32-c3 66 3a 8c ed 05 bf 97 .Kd.>0.2.f:.....
0020 - ff d5 b2 a4 26 fe 99 c0-7f 94 a1 00 00 2e c0 14 ....&...........
0030 - c0 0a 00 39 ff 85 00 88-00 81 00 35 00 84 c0 13 ...9.......5....
---
0076 - <SPACES/NULS>
read from 0x13d7bdf90 [0x13e01a803] (5 bytes => 5 (0x5))
0005 - <SPACES/NULS>
4307385836:error:1400410B:SSL routines:CONNECT_CR_SRVR_HELLO:wrong version number:/System/Volumes/Data/SWE/macOS/BuildRoots/e90674e518/Library/Caches/com.apple.xbs/Sources/libressl/libressl-56.60.2/libressl-2.8/ssl/ssl_pkt.c:386:
---
no peer certificate available
---
No client certificate CA names sent
---
SSL handshake has read 5 bytes and written 0 bytes
---
New, (NONE), Cipher is (NONE)
Secure Renegotiation IS NOT supported
Compression: NONE
Expansion: NONE
No ALPN negotiated
SSL-Session:
Protocol : TLSv1
Cipher : 0000
Session-ID:
Session-ID-ctx:
Master-Key:
Start Time: 1636579015
Timeout : 7200 (sec)
Verify return code: 0 (ok)
---
Here is the server.properties :
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
# SSL CHANGE
listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
# SSL CHANGE
advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
ssl.client.auth=none
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
##################### Confluent Metrics Reporter #######################
# Confluent Control Center and Confluent Auto Data Balancer integration
#
# Uncomment the following lines to publish monitoring data for
# Confluent Control Center and Confluent Auto Data Balancer
# If you are using a dedicated metrics cluster, also adjust the settings
# to point to your metrics kakfa cluster.
#metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
#confluent.metrics.reporter.bootstrap.servers=localhost:9092
#
# Uncomment the following line if the metrics cluster has a single broker
#confluent.metrics.reporter.topic.replicas=1
############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
############################# Confluent Authorizer Settings #############################
# Uncomment to enable Confluent Authorizer with support for ACLs, LDAP groups and RBAC
#authorizer.class.name=io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer
# Semi-colon separated list of super users in the format <principalType>:<principalName>
#super.users=
# Specify a valid Confluent license. By default free-tier license will be used
#confluent.license=
# Replication factor for the topic used for licensing. Default is 3.
confluent.license.topic.replication.factor=1
# Uncomment the following lines and specify values where required to enable CONFLUENT provider for RBAC and centralized ACLs
# Enable CONFLUENT provider
#confluent.authorizer.access.rule.providers=ZK_ACL,CONFLUENT
# Bootstrap servers for RBAC metadata. Must be provided if this broker is not in the metadata cluster
#confluent.metadata.bootstrap.servers=PLAINTEXT://127.0.0.1:9092
# Replication factor for the metadata topic used for authorization. Default is 3.
confluent.metadata.topic.replication.factor=1
# Replication factor for the topic used for audit logs. Default is 3.
confluent.security.event.logger.exporter.kafka.topic.replicas=1
# Listeners for metadata server
#confluent.metadata.server.listeners=http://0.0.0.0:8090
# Advertised listeners for metadata server
#confluent.metadata.server.advertised.listeners=http://127.0.0.1:8090
############################# Confluent Data Balancer Settings #############################
# The Confluent Data Balancer is used to measure the load across the Kafka cluster and move data
# around as necessary. Comment out this line to disable the Data Balancer.
confluent.balancer.enable=true
# By default, the Data Balancer will only move data when an empty broker (one with no partitions on it)
# is added to the cluster or a broker failure is detected. Comment out this line to allow the Data
# Balancer to balance load across the cluster whenever an imbalance is detected.
#confluent.balancer.heal.uneven.load.trigger=ANY_UNEVEN_LOAD
# The default time to declare a broker permanently failed is 1 hour (3600000 ms).
# Uncomment this line to turn off broker failure detection, or adjust the threshold
# to change the duration before a broker is declared failed.
#confluent.balancer.heal.broker.failure.threshold.ms=-1
# Edit and uncomment the following line to limit the network bandwidth used by data balancing operations.
# This value is in bytes/sec/broker. The default is 10MB/sec.
#confluent.balancer.throttle.bytes.per.second=10485760
# Capacity Limits -- when set to positive values, the Data Balancer will attempt to keep
# resource usage per-broker below these limits.
# Edit and uncomment this line to limit the maximum number of replicas per broker. Default is unlimited.
#confluent.balancer.max.replicas=10000
# Edit and uncomment this line to limit what fraction of the log disk (0-1.0) is used before rebalancing.
# The default (below) is 85% of the log disk.
#confluent.balancer.disk.max.load=0.85
# Edit and uncomment these lines to define a maximum network capacity per broker, in bytes per
# second. The Data Balancer will attempt to ensure that brokers are using less than this amount
# of network bandwidth when rebalancing.
# Here, 10MB/s. The default is unlimited capacity.
#confluent.balancer.network.in.max.bytes.per.second=10485760
#confluent.balancer.network.out.max.bytes.per.second=10485760
# Edit and uncomment this line to identify specific topics that should not be moved by the data balancer.
# Removal operations always move topics regardless of this setting.
#confluent.balancer.exclude.topic.names=
# Edit and uncomment this line to identify topic prefixes that should not be moved by the data balancer.
# (For example, a "confluent.balancer" prefix will match all of "confluent.balancer.a", "confluent.balancer.b",
# "confluent.balancer.c", and so on.)
# Removal operations always move topics regardless of this setting.
#confluent.balancer.exclude.topic.prefixes=
# The replication factor for the topics the Data Balancer uses to store internal state.
# For anything other than development testing, a value greater than 1 is recommended to ensure availability.
# The default value is 3.
confluent.balancer.topic.replication.factor=1
################################## Confluent Telemetry Settings ##################################
# To start using Telemetry, first generate a Confluent Cloud API key/secret. This can be done with
# instructions at https://docs.confluent.io/current/cloud/using/api-keys.html. Note that you should
# be using the '--resource cloud' flag.
#
# After generating an API key/secret, to enable Telemetry uncomment the lines below and paste
# in your API key/secret.
#
#confluent.telemetry.enabled=true
#confluent.telemetry.api.key=<CLOUD_API_KEY>
#confluent.telemetry.api.secret=<CCLOUD_API_SECRET>
############ SSL #################
ssl.truststore.location=/Users/karanalang/Documents/Technology/confluent-6.2.1/ssl_certs/truststore/kafka.truststore.jks
ssl.truststore.password=test123
ssl.keystore.location=/Users/karanalang/Documents/Technology/confluent-6.2.1/ssl_certs/keystore/kafka.keystore.jks
ssl.keystore.password=test123
ssl.key.password=test123
# confluent.metrics.reporter.bootstrap.servers=localhost:9093
# confluent.metrics.reporter.security.protocol=SSL
# confluent.metrics.reporter.ssl.truststore.location=/Users/karanalang/Documents/Technology/confluent-6.2.1/ssl_certs/truststore/kafka.truststore.jks
# confluent.metrics.reporter.ssl.truststore.password=test123
# confluent.metrics.reporter.ssl.keystore.location=/Users/karanalang/Documents/Technology/confluent-6.2.1/ssl_certs/keystore/kafka.keystore.jks
# confluent.metrics.reporter.ssl.keystore.password=test123
# confluent.metrics.reporter.ssl.key.password=test123
client-ssl.properties:
bootstrap.servers=localhost:9093
security.protocol=SSL
ssl.truststore.location=/Users/karanalang/Documents/Technology/confluent-6.2.1/ssl_certs/truststore/kafka.truststore.jks
ssl.truststore.password=test123
ssl.keystore.location=/Users/karanalang/Documents/Technology/confluent-6.2.1/ssl_certs/keystore/kafka.keystore.jks
ssl.keystore.password=test123
ssl.key.password=test123
Commands to start the Console Producer/Consumer :
$CONFLUENT_HOME/bin/kafka-console-producer --broker-list localhost:9093 --topic karantest --producer.config $CONFLUENT_HOME/props/client-ssl.properties
$CONFLUENT_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9093 --topic karantest --consumer.config $CONFLUENT_HOME/props/client-ssl.properties --from-beginning
Any ideas on how to resolve this ?
Update : This is the error when i try to debug (using - export KAFKA_OPTS=-Djavax.net.debug=all)
javax.net.ssl|DEBUG|0E|kafka-producer-network-thread | console-producer|2021-11-10 14:04:26.107 PST|SSLExtensions.java:173|Ignore unavailable extension: status_request
javax.net.ssl|DEBUG|0E|kafka-producer-network-thread | console-producer|2021-11-10 14:04:26.107 PST|SSLExtensions.java:173|Ignore unavailable extension: status_request
javax.net.ssl|ERROR|0E|kafka-producer-network-thread | console-producer|2021-11-10 14:04:26.108 PST|TransportContext.java:341|Fatal (CERTIFICATE_UNKNOWN): No name matching localhost found (
"throwable" : {
java.security.cert.CertificateException: No name matching localhost found
at java.base/sun.security.util.HostnameChecker.matchDNS(HostnameChecker.java:234)
at java.base/sun.security.util.HostnameChecker.match(HostnameChecker.java:103)
at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455)
at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:429)
at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:283)
at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141)
at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1335)
at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1232)
at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1175)
at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392)
at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:443)
at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1074)
at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1061)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1008)
at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:509)
at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:601)
at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:447)
at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:332)
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:229)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:563)
at org.apache.kafka.common.network.Selector.poll(Selector.java:499)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:639)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:327)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242)
at java.base/java.lang.Thread.run(Thread.java:829)}