KafkaStreams - InconsistentGroupProtocolException
Asked Answered
W

2

8

I have a Kafka Streams app that connects to our Kafka cluster using the Kafka Streams DSL, like so:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, byte[]> stream = builder.stream(myTopic);

// do work

kStreams = new KafkaStreams(builder, config);
kStreams.start();

And another part of my code base that establishes a connection to our cluster using the consumer client directly.

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config, keyDeserializer, valueDeserializer);
consumer.subscribe(Collections.singletonList(sourceTopic));
consumer.poll(500L);
// etc
consumer.close();

The reason I am doing this is to gather meta data about the consumer group before conditionally kicking off other parts of the app (which includes the Kafka Streams topology). There are probably other ways to do this (e.g. through various hooks or what not), but I am more curious about why the intermixing of these methods will sometimes (intermittently) lead to a InconsistentGroupProtocolException being thrown.

Could someone please shed some light on why this is being thrown? I'm having a difficult time determining what exactly is going on from the source code itself, but I guess the underlying consumers that are constructed by Kafka Streams are specifying a different partitioning protocol than the KafkaConsumer client. Anyways, any help in understanding this exception will be greatly appreciated

Whomever answered 11/1, 2017 at 22:20 Comment(3)
What do you try to accomplish?Westberg
issues.apache.org/jira/browse/KAFKA-4113 See the comment from 03/Jan/17. I am experiencing this issue and thought this was the easiest solutionWhomever
I see. I guess it might work, if you can make sure that either the consumer OR the application is active. Thus, before you start the one or other, make sure that there are no members in the consumer group -- it, the group is not active. C.f. github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/… (Be aware though, that this call in not part of public API and can get changed at any point without notice.)Westberg
W
19

You put the answer yourself. Kafka Streams uses a custom partition assigner and a Kafka Streams client only works with other Kafka Streams clients. If you use a KafkaConsumer with the same group ID as your Kafka Streams app, it will fail to fence off KafkaConsumers to join the Kafka Streams consumer group. Obviously, KafkaConsumer cannot "play" with Kafka Streams.

Westberg answered 11/1, 2017 at 22:41 Comment(2)
Does this only concern actively running consumers? Can I shutdown all "vanilla" consumers, then start up the streams app with the same group id? Otherwise it's quite the hassle to migrate to streams, especially without losing data (offsets).Ministry
AFAIK, it only affect active groups. It should be possible to switch from a plain consumer app to a Kafka Streams app, reusing the group.id as application.id. You won't be able to do a rolling upgrade, but if you shut down the consumer app, it should be possible to restart it as a Kafka Streams app.Westberg
T
1

All consumers which belong to the same group must have one common strategy declared. If a consumer attempts to join a group with an assignment configuration inconsistent with other group members, you will end up with this exception

Here described about paritioning config

Tetanus answered 12/11, 2022 at 14:27 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.