Kafka producer throws an error Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
Asked Answered
N

2

11

My kafka producer is throwing an error "Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION". Here is what I am trying to achieve -

KafkaProducer producer = new KafkaProducer<>(props);
producer.initTransactions();
//transaction 1
producer.beginTransaction();
//send some messages
producer.commitTransaction();

//transaction 2
producer.beginTransaction(); //here it throws an exception "Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION".
//send some messages
producer.commitTransaction();

producer.close();

If I call producer.initTransactions(); again before starting transaction 2, it throws an exception "Invalid transition attempted from state READY to state INITIALIZING".

What am I doing wrong?

Narceine answered 22/1, 2020 at 16:48 Comment(0)
E
1

producer.initTransactions();

This registers the producer with the broker as one that can use transactions, identifying it by its transactional.id and a sequence number, or epoch. In turn, the broker will use these to write-ahead any actions to a transaction log.

And consequently, the broker will remove any actions from that log that belong to a producer with the same transaction id and earlier epoch, presuming them to be from defunct transactions.

https://www.baeldung.com/kafka-exactly-once

I call initTransactions in the constructor itself just after I initialise the producer ->

Properties properties = getProperties();
kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer<>(properties);
kafkaProducer.initTransactions();
return kafkaProducer;
Epoxy answered 17/9, 2021 at 12:56 Comment(0)
F
-2

Just create new producer each time for each transaction.

KafkaProducer producer = new KafkaProducer<>(props);
producer.initTransactions();
//transaction 1
producer.beginTransaction();
//send some messages
producer.commitTransaction();
producer.close();

KafkaProducer producer = new KafkaProducer<>(props);
producer.initTransactions();

//transaction 2
producer.beginTransaction(); //here it throws an exception "Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION".
//send some messages
producer.commitTransaction();

producer.close();
Flow answered 5/4, 2021 at 14:23 Comment(1)
Hi, creating new producer for every transaction looks impractical, Any idea why it throws this error. when it commits the transaction , shouldn't it be ready for starting another ?Agripinaagrippa

© 2022 - 2025 — McMap. All rights reserved.