Problems with Amazon MSK default configuration and publishing with transactions
Asked Answered
D

1

6

Recently we have started doing some testing of our Kafka connectors to MSK, Amazon's managed Kafka service. Publishing records seem to work fine however not when transactions are enabled.

Our cluster consists of 2 brokers (because we have 2 zones) using the default MSK configuration. We are creating our Java Kafka producer using the following properties:

bootstrap.servers=x.us-east-1.amazonaws.com:9094,y.us-east-1.amazonaws.com:9094
client.id=kafkautil
max.block.ms=5000
request.timeout.ms=5000
security.protocol=SSL
transactional.id=transactions 

However when the producer was started with the transactional.id setting which enables transactions, the initTransactions() method hangs:

producer = new KafkaProducer<Object, Object>(kafkaProperties);
if (kafkaProperties.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) {
    // this hangs
    producer.initTransactions();
}

Looking at the log output we see streams of the following, and it didn't seem like it ever timed out.

TransactionManager - Enqueuing transactional request (type=FindCoordinatorRequest,
    coordinatorKey=y, coordinatorType=TRANSACTION)
TransactionManager - Request (type=FindCoordinatorRequest, coordinatorKey=y,
    coordinatorType=TRANSACTION) dequeued for sending
NetworkClient - Found least loaded node z:9094 (id: -2 rack: null) connected with no
    in-flight requests
Sender - Sending transactional request (type=FindCoordinatorRequest, coordinatorKey=y,
    coordinatorType=TRANSACTION) to node z (id: -2 rack: null)
NetworkClient - Sending FIND_COORDINATOR {coordinator_key=y,coordinator_type=1} with
    correlation id 424 to node -2
NetworkClient - Completed receive from node -2 for FIND_COORDINATOR with
    correlation id 424, received {throttle_time_ms=0,error_code=15,error_message=null,
    coordinator={node_id=-1,host=,port=-1}}
TransactionManager LogContext.java:129 - Received transactional response
    FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', 
    error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null)) for request
    (type=FindCoordinatorRequest, coordinatorKey=xxx, coordinatorType=TRANSACTION)

As far as I can determine, the broker is available and each of the hosts in the bootstrap.servers property are available. If I connect to each of them and publish without transactions then it works.

Any idea what we are missing?

Daliadalila answered 15/1, 2021 at 22:47 Comment(0)
D
8

However when the producer was started with the transactional.id setting which enables transactions, the initTransactions() method hangs:

This turned out to a problem with the default AWS MSK properties and the number of brokers. If you create a Kafka cluster with less than 3 brokers, the following settings will need to be adjusted.

The following settings should be set (I think) to the number of brokers:

Property Kafka
Default
AWS
Default
Should
Be (1)
Description
default.replication.factor 1 3 2 Default replication factors for automatically created topics.
min.insync.replicas 1 2 2 Minimum number of replicas that must acknowledge a write for the write to be considered successful
offsets.topic.
replication.factor
3 3 2 Internal topic that shares offsets on topics.
transaction.state.log.
replication.factor
3 3 2 Replication factor for the transaction topic.

Here's the Kafka docs on broker properties.

(1) Because we have 2 brokers, we ended up with:

default.replication.factor=2
min.insync.replicas=2
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2

This seemed to resolve the issue. IMHO this is a real problem with the AWS MSK and the default configuration. They need to auto-generate the default configuration and tune it depending on the number of brokers in the cluster.

Daliadalila answered 15/1, 2021 at 22:49 Comment(2)
With the configuration that you posted, if one of the brokers goes down and your producers are using ack=:all (-1), writes won't be accepted right?Metopic
I'm not 100% sure that the write to the broker that is up won't be accepted @bilby91. You may have to turn on transactions to get that lever of cluster consistency.Daliadalila

© 2022 - 2024 — McMap. All rights reserved.