I've written a python script with aiokafka to produce and consume from a Kafka cluster in AWS MSK, I'm running the script from a EC2 instance that is in the same VPC as my cluster and when I try to connect my script to a cluster it refuse to accept the connection:
The script
from aiokafka import AIOKafkaConsumer
import asyncio
import os
import sys
async def consume():
bootstrap_server = os.environ.get('BOOTSTRAP_SERVER', 'localhost:9092')
topic = os.environ.get('TOPIC', 'demo')
group = os.environ.get('GROUP_ID', 'demo-group')
consumer = AIOKafkaConsumer(
topic, bootstrap_servers=bootstrap_server, group_id=group
)
await consumer.start()
try:
# Consume messages
async for msg in consumer:
print("consumed: ", msg.topic, msg.partition, msg.offset,
msg.key, msg.value, msg.timestamp)
finally:
# Will leave consumer group; perform autocommit if enabled.
await consumer.stop()
def main():
try:
asyncio.run(consume())
except KeyboardInterrupt:
print("Bye!")
sys.exit(0)
if __name__ == "__main__":
print("Welcome to Kafka test script. ctrl + c to exit")
main()
The exception
Unable to request metadata from "boot-xxxxxxx.cx.kafka-serverless.us-xxxx-1.amazonaws.com:9098": KafkaConnectionError: Connection at boot-xxxxxxx.cx.kafka-serverless.us-xxxx-1.amazonaws.com:9098 closed
Traceback (most recent call last):
File "producer.py", line 33, in <module>
main()
File "producer.py", line 25, in main
asyncio.run(produce_message(message))
File "/usr/lib64/python3.7/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/usr/lib64/python3.7/asyncio/base_events.py", line 587, in run_until_complete
return future.result()
File "producer.py", line 12, in produce_message
await producer.start()
File "/home/ec2-user/py-kafka-test/pykafka/lib64/python3.7/site-packages/aiokafka/producer/producer.py", line 296, in start
await self.client.bootstrap()
File "/home/ec2-user/py-kafka-test/pykafka/lib64/python3.7/site-packages/aiokafka/client.py", line 250, in bootstrap
f'Unable to bootstrap from {self.hosts}')
kafka.errors.KafkaConnectionError: KafkaConnectionError: Unable to bootstrap from [('boot-zm5x2eaw.c3.kafka-serverless.us-east-1.amazonaws.com', 9098, <AddressFamily.AF_UNSPEC: 0>)]
Unclosed AIOKafkaProducer
producer: <aiokafka.producer.producer.AIOKafkaProducer object at 0x7f76d123a510>
I've already tested the connection with the kafka shell scripts and it worked fine:
./kafka-console-producer.sh --bootstrap-server boot-xxxxxxx.cx.kafka-serverless.us-xxxx-1.amazonaws.com:9098 --producer.config client.properties --topic myTopic
But whenever I try with python it just don't work, I've investigated a little and found that it might be the authentication protocol, my KMS Cluster is protected with IAM role-based authentication but no matter how much I search there is no documentation on how to authenticate with IAM in the python kafka libraries: aiokafka, python-kafka, faust, etc.
Does anyone have an example on how to successfully connect to a KMS serverless cluster with IAM role-based authentication using Python?