I have a Kafka Streams app that connects to our Kafka cluster using the Kafka Streams DSL, like so:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, byte[]> stream = builder.stream(myTopic);
// do work
kStreams = new KafkaStreams(builder, config);
kStreams.start();
And another part of my code base that establishes a connection to our cluster using the consumer client directly.
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config, keyDeserializer, valueDeserializer);
consumer.subscribe(Collections.singletonList(sourceTopic));
consumer.poll(500L);
// etc
consumer.close();
The reason I am doing this is to gather meta data about the consumer group before conditionally kicking off other parts of the app (which includes the Kafka Streams topology). There are probably other ways to do this (e.g. through various hooks or what not), but I am more curious about why the intermixing of these methods will sometimes (intermittently) lead to a InconsistentGroupProtocolException
being thrown.
Could someone please shed some light on why this is being thrown? I'm having a difficult time determining what exactly is going on from the source code itself, but I guess the underlying consumers that are constructed by Kafka Streams are specifying a different partitioning protocol than the KafkaConsumer
client. Anyways, any help in understanding this exception will be greatly appreciated