AWS MSK - Timeout when creating Kafka topic with ACL turned-on
Asked Answered
M

1

9

I'm using AWS MSK and I want to enable ACLs but I'm unable to create a topic when ACLs are turned-on. I'm using the command-line tools for all the operations. Here's a summary of what I'm doing:

  • Create a fresh cluster
  • Create a topic - this works fine
  • Turn on ACL for client1 on resource=CLUSTER and operation=ALL
  • Create topic using AdminClient (by providing the --bootstrap-server option) - this gives a timeout exception
  • Re-try creating the same topic - this gives an error saying topic already exists
  • List topics using AdminClient - this returns no topics
  • Create topic using Zookeeper connect - this works
  • List topics using Zookeeper connect - this returns all the topic I've created (even those that timed-out)

So the issue is that the topic is getting created on Zookeeper but the broker can't access it. Presumably due to some ACL rule that I'm missing.

Raw output of the commands that I've run:

ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties \
--create --topic test3 --partitions 1 --replication-factor 1

Error while executing topic command : org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
[2019-09-30 17:16:19,389] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Aborted du
e to timeout.
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:175)
        at kafka.admin.TopicCommand$TopicService.createTopic(TopicCommand.scala:134)
        at kafka.admin.TopicCommand$TopicService.createTopic$(TopicCommand.scala:129)
        at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:157)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
 (kafka.admin.TopicCommand$)

Running the same command again:

ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties \
--create --topic test3 --partitions 1 --replication-factor 1

Error while executing topic command : org.apache.kafka.common.errors.TopicExistsException: Topic 'test3' already exists.
[2019-09-30 17:25:38,266] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic
'test3' already exists.
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:175)
        at kafka.admin.TopicCommand$TopicService.createTopic(TopicCommand.scala:134)
        at kafka.admin.TopicCommand$TopicService.createTopic$(TopicCommand.scala:129)
        at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:157)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 'test3' already exists.
 (kafka.admin.TopicCommand$)

List of topics via AdminClient:

ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties --list


List of topics via Zookeeper connect:

ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --zookeeper $ZK --command-config ~/client1.properties --list
test
test2
test3
test4
test5

Here are my ACL rules:

Current ACLs for resource `ResourcePattern(resourceType=CLUSTER, name=kafka-cluster, patternType=LITERAL)`:
        (principal=User:CN=client1.com, host=*, operation=ALL, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=--operation=All, patternType=LITERAL)`:
        (principal=User:CN=client1.com, host=*, operation=ALL, permissionType=ALLOW)

What am I missing?

Maidy answered 30/9, 2019 at 18:8 Comment(0)
A
4

I don't think this has anything to do with AWS MSK, and it is rather an issue with your Kafka secured cluster configuration. Both clients (subscribers/producers) and inter-broker actions require authorization in a secured cluster. You'd have the same issue in a non-managed Kafka cluster.

The recommendation is to set up a "superuser" user (I'd call them service accounts) on the servers and then give these "superuser" users ACLs that allow the inter-broker interactions you need for your cluster. The exact ACLs you need is going to vary depending on your use cases and security preferences.

In server.properties you'd add an entry like super.users=User:BrokerService, and is documented at https://docs.confluent.io/current/kafka/authorization.html#kafka-auth-superuser. The documentation suggests using Alice and Bob as superuser names, which seems confusing to me. Pick whatever user name makes sense for you.

Then you need to setup a similar ACL that uses a user name principal with the "superuser" user you created above e.g. principal=User:BrokerService. The ACL would give whatever permissions the brokers need. Your immediate use case is to ALLOW READ of all topics it sounds like. You'll probably need other ACLs for inter-broker communication as well, but I can't tell you what you need exactly without more information about what you want to do.

For example this command to setup the ACL.

kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add \
--allow-principal User:BrokerService --operation All --topic '*' --cluster

More options for setting up the ACLs and a description of your exact problem are documented here https://docs.confluent.io/current/kafka/authorization.html#acl-format

Again please research some more or edit your question if you are looking for an exact configuration to use here as there are security and use case implications on what ACLs you use.

Aeciospore answered 30/9, 2019 at 20:31 Comment(8)
MSK doesn't give direct access to edit the server.properties file. I need to figure-out what's the principal used for inter-broker communication and add a ACL rule for that. I'll raise this with the AWS support.Maidy
Fixed this by adding ACL for resource "cluster" and operation "All" for User:*.test.etg92k.c2.kafka.us-east-1.amazonaws.com which is the DNS name of the brokers.Maidy
Very nice, thank you for sharing. How did you figure this out? I didn't see it in the docs and reached out to our solutions architect.Aeciospore
AWS support suggested trying out this solution.Maidy
@Maidy I am also facing this issue now. I created cluster acls with my admin user and added the cluster acls for broker service as you suggested. still the same result. could you please share the exact acls with your admin user and the broker serviceWheeled
@Wheeled the MSK docs have been updated with the instructions on how to set ACLs correctly. See docs.aws.amazon.com/msk/latest/developerguide/msk-acls.htmlMaidy
I looked upon the link, but still lack one ACL. I placed a ticket with AWS and resolved it. We need to add one more ACL for User:AnonymousWheeled
Is there an option to do something like this for SASL/SCRAM auth method?Beane

© 2022 - 2025 — McMap. All rights reserved.