How to create Kafka-python producer with ssl configuration
Asked Answered
O

4

7

I'm trying to create kafka producer with ssl. I need information on how to set SSL parameters in the constructor, the information provided in kafka-python client is not descriptive enough.

What are the ssl_certfile, ssl_cafile, ssl_keyfile parameters. I'm not sure where to look for these files.

producer = KafkaProducer(bootstrap_servers=kafka_broker,
  value_serializer=lambda v: json.dumps(v).encode('utf-8'),
  security_protocol='SSL',
  api_version=(0,10),
  ssl_cafile='ca-certs.pem',ssl_certfile='server.pem',
  ssl_keyfile='server.pem',ssl_password='xxx')
producer.send('rk976772_topic',{"test":0})

Traceback (most recent call last): File "", line 1, in File "/usr/lib/python2.7/site-packages/kafka/producer/kafka.py", line 543, in send self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0) File "/usr/lib/python2.7/site-packages/kafka/producer/kafka.py", line 664, in _wait_on_metadata "Failed to update metadata after %.1f secs." % max_wait) kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

Overlie answered 22/8, 2018 at 3:33 Comment(1)
IT could be an issue with the topic name that you may be giving wrong.Calculating
O
0

I had to publish the message over SASL_SSL Used below code to create a producer with SASL_SSL protocol.

from kafka import KafkaProducer

security_protocol=environment_params.kafka_security_protocol
if env=='dev':
    if security_protocol=='SASL_SSL':
        producer = KafkaProducer(bootstrap_servers=environment_params.dev_kafka_broker,value_serializer=lambda v: json.dumps(v).encode('utf-8'),security_protocol=security_protocol,ssl_cafile='ca-certs.pem',sasl_mechanism='GSSAPI',api_version=environment_params.dev_kafka_api_version)
    elif security_protocol=='PLAINTEXT':
        producer = KafkaProducer(bootstrap_servers=environment_params.dev_kafka_broker,value_serializer=lambda v: json.dumps(v).encode('utf-8'))
Overlie answered 7/9, 2018 at 15:48 Comment(1)
sasl_mechanism='GSSAPI' is only required if sasl.enabled.mechanisms=PLAIN is not set in the kafka configuration. I accidentally set SASL_ENABLED_MECHANISMS instead of KAFKA_SASL_ENABLED_MECHANISMS as a docker container environment variable hence sasl.enabled.mechanisms remained on the default which is GSSAPI.Aqua
V
16

I was having this issue as well as many other while trying to configure kafka with SSL or SASL_SSL. I'm posting a full tutorial here in case anyone else runs into the same issues. I am using kafka-python 1.4.6 with kafka 2.2.0 on CentOS 6.

Below are the configurations that worked for me for SASL_SSL using kafka-python client. These configurations can be used for PLAINTEXT and SSL security protocols along with SASL_SSL and SASL_PLAINTEXT.

Bash script to generate key files, CARoot, and self-signed cert for use with SSL:

#!/bin/bash
#Step 1
keytool -keystore server.keystore.jks -alias localhost -validity 365 -keyalg RSA -genkey
#Step 2
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
#Step 3
keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:admin123
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed

You can then use the following command to extract the CARoot.pem:

keytool -exportcert -alias CARoot -keystore server.keystore.jks -rfc -file CARoot.pem

In my server.properties file I have:

listeners=PLAINTEXT://localhost:9091,SASL_PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094
security.protocol=SSL
sasl.enabled.mechanisms=PLAIN
ssl.truststore.location=/var/private/ssl/server.truststore.jks
ssl.truststore.password=admin123
ssl.keystore.location=/var/private/ssl/server.keystore.jks
ssl.keystore.password=admin123
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
advertised.listeners=PLAINTEXT://localhost:9091,SASL_PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094

In my JAAS configuration file(/etc/kafka/kafka_plain_jaas.conf):

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
   username=kafka
   password=kafka-secret
   user_username=password;
};
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
  username=username
  password=password;
};

Before starting the Kafka server, need to run the following:

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_plain_jaas.conf"

Python consumer and producer: The ssl_context and api_version are what caused SSL handshake errors to occur for me, leading to a timeout. So I commented those out. (There were some tutorials out there that mentioned to use those.)

from kafka import KafkaConsumer, KafkaProducer
import kafka
import ssl
import logging
logging.basicConfig(level=logging.DEBUG)

try:
    topic = "sendMessage"
    sasl_mechanism = "PLAIN"
    username = "username"
    password = "password"
    security_protocol = "SASL_SSL"

    #context = ssl.create_default_context()
    #context.options &= ssl.OP_NO_TLSv1
    #context.options &= ssl.OP_NO_TLSv1_1

    consumer = KafkaConsumer(topic, bootstrap_servers='localhost:9094',
                              #api_version=(0, 10),
                              security_protocol=security_protocol,
                              #ssl_context=context,
                              ssl_check_hostname=True,
                              ssl_cafile='../keys/CARoot.pem',
                              sasl_mechanism = sasl_mechanism,
                              sasl_plain_username = username,
                              sasl_plain_password = password)
                              #ssl_certfile='../keys/certificate.pem',
                              #ssl_keyfile='../keys/key.pem')#,api_version = (0, 10))

    producer = KafkaProducer(bootstrap_servers='localhost:9094',
                             #api_version=(0, 10),
                             security_protocol=security_protocol,
                             #ssl_context=context,
                             ssl_check_hostname=True,
                             ssl_cafile='../keys/CARoot.pem',
                             sasl_mechanism=sasl_mechanism,
                             sasl_plain_username=username,
                             sasl_plain_password=password)
                              #ssl_certfile='../keys/certificate.pem',
                              #ssl_keyfile='../keys/key.pem')#, api_version = (0,10))
    # Write hello world to test topic
    producer.send(topic, bytes("Hello World SSL"))
    producer.flush()


    for msg in consumer:
        print(msg)


except Exception as e:
    print e
Varicotomy answered 19/4, 2019 at 19:44 Comment(0)
J
3

Dudes, watch carefully and follow the instructions...

Step 1: Run all scripts (if necessary, set the values)

keytool -keystore kafka.server.keystore.jks -alias localhost -keyalg RSA -validity {validity} -genkey

openssl req -new -x509 -keyout ca-key -out ca-cert -days {validity}

keytool -keystore kafka.client.truststore.jks -alias CARoot -importcert -file ca-cert

keytool -keystore kafka.server.truststore.jks -alias CARoot -importcert -file ca-cert

keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file

openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}

keytool -keystore kafka.server.keystore.jks -alias CARoot -importcert -file ca-cert

keytool -keystore kafka.server.keystore.jks -alias localhost -importcert -file cert-signed

keytool -exportcert -alias CARoot -keystore kafka.server.keystore.jks -rfc -file CARoot.pem

As a result, you will get:

kafka.server.keystore.jks, kafka.server.truststore.jks, kafka.client.truststore.jks, ca-cert, ca-cert.srl, ca-key, cert-file, cert-signed, CARoot.pem

Step 2: Copy kafka.server.keystore.jks and kafka.server.truststore.jks to the server and modify server.properties file (it is located in the config folder)

listeners=PLAINTEXT://MYSERVER:9092,SSL://MYSERVER:9093
advertised.listeners=PLAINTEXT://MYSERVER:9092,SSL://MYSERVER:9093
ssl.keystore.location=../store/kafka.server.keystore.jks
ssl.keystore.password=qwerty
ssl.truststore.location=../store/kafka.server.truststore.jks
ssl.truststore.password=qwerty
ssl.client.auth=required
ssl.endpoint.identification.algorithm=

Step 3: Create the python program

def kafka_consumer_ssl():
consumer = KafkaConsumer('test_topic',
                         bootstrap_servers=['MYSERVER:9093'],
                         auto_offset_reset='earliest',
                         enable_auto_commit=True,
                         value_deserializer=lambda x: x.decode('utf-8'),
                         security_protocol='SSL',
                         ssl_check_hostname=False,
                         ssl_cafile='CARoot.pem',
                         ssl_certfile='ca-cert',
                         ssl_keyfile='ca-key',
                         ssl_password='qwerty'
                         )

for event in consumer:
    print(event.value)

kafka_consumer_ssl()

Step 4: Enjoy !!!

Janiuszck answered 28/10, 2021 at 15:6 Comment(1)
Thank you very much ,This solution worked for me. Could you please share SSL Producer sample code.Lacefield
O
0

I had to publish the message over SASL_SSL Used below code to create a producer with SASL_SSL protocol.

from kafka import KafkaProducer

security_protocol=environment_params.kafka_security_protocol
if env=='dev':
    if security_protocol=='SASL_SSL':
        producer = KafkaProducer(bootstrap_servers=environment_params.dev_kafka_broker,value_serializer=lambda v: json.dumps(v).encode('utf-8'),security_protocol=security_protocol,ssl_cafile='ca-certs.pem',sasl_mechanism='GSSAPI',api_version=environment_params.dev_kafka_api_version)
    elif security_protocol=='PLAINTEXT':
        producer = KafkaProducer(bootstrap_servers=environment_params.dev_kafka_broker,value_serializer=lambda v: json.dumps(v).encode('utf-8'))
Overlie answered 7/9, 2018 at 15:48 Comment(1)
sasl_mechanism='GSSAPI' is only required if sasl.enabled.mechanisms=PLAIN is not set in the kafka configuration. I accidentally set SASL_ENABLED_MECHANISMS instead of KAFKA_SASL_ENABLED_MECHANISMS as a docker container environment variable hence sasl.enabled.mechanisms remained on the default which is GSSAPI.Aqua
E
-1

Thanks Alot. I was having jks file and my kafka-producer was giving continuously error SSL Certification verify error 897 Though converted the CARoot.pem file but it was not working.

What helped is I converted using the below command and used on producer and it worked.

kafka.server.keystore.jks, 
kafka.server.truststore.jks, 
kafka.client.truststore.jks, 
ca-cert, 
ca-cert.srl, 
ca-key, cert-file, 
cert-signed, 
CARoot.pem
Expostulation answered 10/2, 2022 at 7:1 Comment(1)
@outlak What is the command?Lecialecithin

© 2022 - 2025 — McMap. All rights reserved.