Amazon Managed Streaming for Kafka- MSK features and performance
Asked Answered
B

1

41

I'm evaluating AWS Managed Service Kafka (MSK) and I know that currently, it is in preview mode, hence might not have all features or proper documentation. I tried setting up the msk cluster and was validating whether msk could fulfill all the use case/requirement of our company, but currently, it lacks documentation and example.

https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html

I have the following queries:

i) How can I access AWS MSK with the Kafka clients running on my on-premise system?

ii) Does MSK support schema evolution and exactly once semantics?

iii) Will MSK provide some way to update some cluster or tuning configuration? Like aws glue provides parameter change for spark executr and driver memory in their managed environment.

iv) Is it possible to integrate MSK with other AWS service (e.g Redshift,EMR,etc)?

v) Can I use streaming sql with MSK through ksql? How can I set up KSQL with MSK?

vi) How can I perform real-time predictive analysis of data flowing through MSK?

vii) Also how reliable is MSK compared to other cloud-based kafka cluster from Azure/confluent and any performance benchmark compared to vanilla kafka? And what is the maximum number of brokers that can be lauched in cluster?

Boxwood answered 28/3, 2019 at 23:9 Comment(2)
processing.guarantee does not seem to be available docs.aws.amazon.com/msk/latest/developerguide/…Strut
@Strut Most of the allowable/configurable properties mentioned in the MSK documentation are topic specific (docs.aws.amazon.com/msk/latest/developerguide/…) . Whereas the processing.gurantee is a kafka stream related property to enable (exactly once ) like semantic. And you can set the property in the kafka stream code. Although I haven't tested it out on my side, but kafka stream is basically a client/consumer for MSK cluster and the stream specific properties should ideally work.Theirs
T
99

MSK is basically the vanilla apache kafka cluster customized and managed by aws (with predefined configuration settings based on cluster instance type, number of brokers,etc) tuned for the cloud environment.

Ideally, it should be able to perform all/most things that open source Kafka supports. Also if you have specific use case or requirement that is not documented, I will suggest you contact AWS support for further clarification regarding the managed part of kafka cluster (maximum number of broker allowed, reliability, cost).

I will try to answer your questions based on my personal experience:

i) How can I access AWS MSK with the kafka clients running on my on premise system?

You cannot access MSK directly from on-premise or local machine using kafka client or kafka stream. Because the broker url, zookeeper connection string are private ip's of the msk cluster vpc/subnet. To access through kafka client, you need to launch ec2 instance in the same vpc of MsK and execute kafka client(producer/consumer) to acess msk cluster.

To access the MSK cluster from local machine or on-premise systems, you can set up kafka Rest Proxy framework open-sourced by Confluent to acess the MSK cluster from the outside world via rest api. This framework is not full-fledged kafka client and doesn't allow all operation of kafka client, but you can do most operation on the cluster starting from fetching metadata of cluster, topic information, producing and consuming the message,etc.

First set up the confluent repo and ec2 instance security group (Refer - Section-1: Pre Install or set up- additional kafka components) and then install/setup kafka rest proxy.

sudo yum install confluent-kafka-rest 

Create file name kafka-rest.properties and add the following content-

bootstrap.servers=PLAINTEXT://10.0.10.106:9092,PLAINTEXT://10.0.20.27:9092,PLAINTEXT://10.0.0.119:9092
zookeeper.connect=10.0.10.83:2181,10.0.20.22:2181,10.0.0.218:2181
schema.registry.url=http://localhost:8081

** modify the bootstrapserver and zookeeper url/ips.

Start rest server

kafka-rest-start kafka-rest.properties &

Access the MSK via rest API with curl or rest client/browser.

Get a list of topics

curl "http://localhost:8082/topics"

curl "http://<ec2 instance public ip>:8082/topics"

In order to the access from on-premise or local machine, makesure the ec2 instance on which the rest server is running has public ip or elastic ip attached.

More Rest API operation https://github.com/confluentinc/kafka-rest

ii) Does MSK support schema evolution and exactly once semantics?

You can use avro message along with 'Schema Registry' to achieve schema evolution and schema maintenance.

Installing and setting up schema registry is similar to confluent kafka-rest proxy.

sudo yum install confluent-schema-registry

Create file name schema-registry.propertie and add the following content-

listeners=http://0.0.0.0:8081
kafkastore.connection.url=10.0.10.83:2181,10.0.20.22:2181,10.0.0.218:2181
kafkastore.bootstrap.servers=PLAINTEXT://10.0.10.106:9092,PLAINTEXT://10.0.20.27:9092,PLAINTEXT://10.0.0.119:9092
kafkastore.topic=_schemas
debug=false

** modify the bootstrapserver and zookeeper(connection) url/ips.

Start the schema registry service

schema-registry-start schema-registry.properties &

Refer for more information: https://github.com/confluentinc/schema-registry

https://docs.confluent.io/current/schema-registry/docs/schema_registry_tutorial.html

Exactly once semantics is apache kafka's feature and although I haven't tested it on msk, I believe it should support this feature since it's part of open-source apache kafka only.

iii) Will MSK provide some way to update some cluster or tuning configuration? Like aws glue provides parameter change for spark executor and driver memory in their managed environment.

Yes, it is possible to change the configuration parameter at runtime. I have tested by changing the retention.ms parameter using kafka config tool and the change was applied on the topic immediately. So I think you can update other parameters as well, but MSK might not allow all config changes, just like AWS glue allow only few spark config parameter changes, because allowing all parameter to be changed by the user might be vulnerable to the managed environment.

Change through kafka config tool

kafka-configs.sh --zookeeper 10.0.10.83:2181,10.0.20.22:2181,10.0.0.218:2181  --entity-type topics --entity-name jsontest --alter --add-config retention.ms=128000

Verified change using rest

curl "http://localhost:8082/topics/jsontest"

Now Amazon MSK enables you to create a custom MSK configuration.

Please refer to the doc below for config/parameters that can be updated:

https://docs.aws.amazon.com/msk/latest/developerguide/msk-configuration-properties.html

Also the default configuration of MSK Kafka:

https://docs.aws.amazon.com/msk/latest/developerguide/msk-default-configuration.html

iv) Is it possible to integrate MSK with other AWS service (e.g Redshift,EMR,etc)?

Yes, you can connect/integrate to other aws service with MSK. For example you can run Kafka client (consumer) to read data from kafka and write to redshift, rds,s3 or dynamodb. Make sure the kafka client is running on the ec2 instance (inside the msk vpc) that has proper iam role to access those service and the ec2 instance is in public subnet or private subnet ( having NAT or vpc endpoint for s3).

Also you can launch EMR inside MSK clusters vpc/subnet and then through EMR(spark) you can connect to other service.

Spark structure streaming with AWS Managed Service Kafka

Launch EMR cluster in vpc of the MSK cluster Allow the EMR Master and Slave security group in the inbound rule of MSK clusters security group for port 9092

Start Spark shell

spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0

Connect to MSK cluster from spark structure streaming

val kafka = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "10.0.10.106:9092,10.0.20.27:9092,10.0.0.119:9092").option("subscribe", "jsontest") .load()

Start reading/printing the message on the console

val df=kafka.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("console").start()

or

val df=kafka.selectExpr("CAST(value AS STRING)").writeStream.format("console").start()

enter image description here

enter image description here

v) Can I use streaming sql with MSK through ksql? How can I set up KSQL with MSK?

Yes you can set up KSQL with MSK cluster. Basically, you need to launch an ec2 instance in the same vpc/subnet of the MSK cluster. And then install the ksql server+ client in the ec2 instance and use it.

First set up confluent repo and ec2 instance security group (refer - Section-1: Pre Install or set up- additional kafka components) and then install/setup Ksql server/client.

After that install ksql server

sudo yum install confluent-ksql 

Create file name ksql-server.properties and add the following content-

bootstrap.servers=10.0.10.106:9092,10.0.20.27:9092,10.0.0.119:9092
listeners=http://localhost:8088

** modify bootstrap server ips/url.

Start the ksql server

ksql-server-start ksql-server.properties &

After that start the ksql cli

ksql http://localhost:8088

And finally run the command to get list of topics

ksql> SHOW TOPICS;

 Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups 
-----------------------------------------------------------------------------------------
 _schemas    | false      | 1          | 3                  | 0         | 0              
 jsontest    | false      | 1          | 3                  | 1         | 1              
----------------------------- --------------------------------------------------

Refer for more info- https://github.com/confluentinc/ksql

vi) How can I perform real time predictive analysis of data flowing through MSK?

Doing predictive analysis or real-time machine learning stuff is really not specific to MSK. The way you will do with kafka cluster(or any streaming pipeline), same is applicable to MSK. There are various ways to acheive as per your exact requirement, but I will describe the most common or widely used one across the industry:

  • Using Spark with MSK (kafka) and do analysis through structure streaming and MLIB ( having your predictive model).

  • You can train your predictive model in H20.ai framework, then export the model as java pojo. And then integrate the java pojo model with kafka consumer code that will process the message from msk(kafka) topic and do the real-time analysis.

  • You can train model and deploy in sagemaker, and then invoke from kafka client consumer code to get real-time prediction by invoking sagemaker model inference endpoint based on kafka data/message.

vii) Also how reliable is MSK compared to other cloud-based kafka cluster from Azure/confluent and any performance benchmark compared to vanilla kafka? And what is the maximum number of brokers that can be lauched in cluster?

MSK is in preview as you already know, so its little too early to say about its reliability. But in general, like all other AWS services, it should get more reliable with time along with new features and better documentation hopefully.

I dont think AWS or any cloud vendor azure,google cloud provides performance benchmark of their services, so you have to try out the performance testing from your side. And kafka clients/tools (kafka-producer-perf-test.sh, kafka-consumer-perf-test.sh) provide a performance benchmark script that could be executed to have performance idea of the cluster. Again performance testing of a service in real production scenario will vary a lot depending on various factor such as ( message size, volume of data coming to kafka, sync or async producer, how many consumer,etc) and the performance will come down to specific usecase rather than a generic benchmark.

Regarding the max number of broker supported in cluster, it is better to ask the AWS guys through their support system.


Section-1: Pre Install or set up- additional kafka components:

Launch Ec2 instance in the MSK cluster's vpc/subnet.

Login to the ec2 instance

Set up yum repo to download confluent kafka component packages through yum

sudo yum install curl which
  sudo rpm --import https://packages.confluent.io/rpm/5.1/archive.key

Navigate to /etc/yum.repos.d/ and create a file named confluent.repo and add following contents

[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/5.1/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
enabled=1

[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/5.1
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
enabled=1 

Next clean yum repo

sudo yum clean all 

Allow the security group of the ec2 instance in the MSK clusters security group's inbound rules for port 9092(connecting broker) and 2081(connecting zookeeper).

Section-2: Command to get the MSK clusters broker and zookeeper url/ip info

Zookeeper connection url port

aws kafka describe-cluster --region us-east-1 --cluster-arn <cluster arn>

Broker connection url port

aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn <cluster arn>

----------------------------------------------------------------------

Note:

MSK overview and components settings:

enter image description here

  • Kindly refer to the MSK high-level architecture and how to set up various components (rest, schema registry, sticky load balancer, etc). Also how it will connect with other aws services. It's just a simple reference architecture only.

  • Also instead of setting up rest, schema registry and ksql on the ec2 instance, you can also dockerize inside the container.

  • And if you are setting up multiple rest proxy, then you need to put that rest-proxy service behind a sticky load balancer like (nginx using ip hash) to make sure same client consumer map to same consumer group to avoid data fetching mismatch/inconsistencies across data reading.

Hope you find the above information useful !!

Theirs answered 28/3, 2019 at 23:44 Comment(11)
I haven't tried this, but I think you can create an MSK cluster in public subnets and then access the brokers from outside the VPC using their public endpoints (after attaching a security group that permits traffic from your source IP)? In the case of private subnets, VPC peering is an option too (which is what we do)Diaconicon
processing.guarantee does not seem to be available docs.aws.amazon.com/msk/latest/developerguide/…Strut
@Kuntal-G, Thank you for your detailed answer. Is the Amazon MSK support Kafka-connect?Canine
@Strut Most of the allowable/configurable properties mentioned in the MSK documentation are topic specific (docs.aws.amazon.com/msk/latest/developerguide/…) . Whereas the processing.gurantee is a kafka stream related property to enable (exactly once ) like semantic. And you can set the property in the kafka stream code. Although I haven't tested it out on my side, but kafka stream is basically a client/consumer for MSK cluster and the stream specific properties should ideally work.Theirs
@Canine Yes, you can use any component that leverages the Apache Kafka producer and consumer APIs such as Kafka Connect , Kafka Stream, and the Apache Kafka AdminClient. But tools that upload .jar files into Apache Kafka clusters are currently not compatible with Amazon MSK, including Confluent Control Center, Confluent Auto Data Balancer, Uber uReplicator, and LinkedIn Cruise Control.Theirs
You answer for #1 is not accurate. If you have VPC setup you can connect to the cluster from on prem.Diedra
@Diaconicon - From the Amazon MSK FAQs: Q: Is the connection between my clients and an Amazon MSK cluster always private? Yes, the only way data can be produced and consumed from an Amazon MSK cluster is over a private connection between your clients in your VPC and the Amazon MSK cluster. Amazon MSK does not support public endpoints.Mylesmylitta
Hey @Kuntal-G, we are currently implementing kafka-rest-proxy for aws managed kafka. Though, we are facing some issues, while communication issue. {"error_code":50002,"message":"Kafka error: null"}[ But i have configured broker and zookeer ip, even though issues occur, can you please suggest meDollie
@Kuntal-G, What will be the maximum recordsize if we use MSK ? For Kafka, default 1MB and can be configured. With Kinesis, its 1MB maximum.Crescentia
@R.Narasimmavarman I'm not sure about your set up details, but from the error it looks like standard "Internal Server Error". Please verify that you are setting up the rest proxy in the same VPC of MSK Kafka. Also before running confluent kafka-rest-proxy, try to run kafka client to connect to MSK from ec2/ecs instance where you are setting up the proxy.Theirs
@Crescentia Default value of the MSK payload/message size will be 1MB like Kafka. But you can configure the "message.max.bytes" and "replica.fetch.max.bytes" to change those values in MSK. MSK provide custom configuration changes of some of the kafka config parameters. Refer: docs.aws.amazon.com/msk/latest/developerguide/…Theirs

© 2022 - 2024 — McMap. All rights reserved.