Kafka Streams KTable configuration error on Message Hub
Asked Answered
B

1

7

This issue is now solved on Message Hub

I am having some trouble creating a KTable in Kafka. I am new to Kafka, which is probably the root of my problem, but I thought I could ask here anyway. I have a project where I would like to keep track of different IDs by counting their total occurrence. I am using Message Hub on IBM Cloud to manage my topics, and it has worked splendid so far.

I have a topic on Message Hub that produces messages like {"ID":"123","TIMESTAMP":"1525339553", "BALANCE":"100", "AMOUNT":"4"}, for now, the only key of relevance is ID.

My Kafka code, along with the Streams configuration, looks like this:

import org.apache.kafka.streams.StreamsConfig;

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, "3");    
props.put("security.protocol","SASL_SSL");
props.put("sasl.mechanism","PLAIN");
props.put("ssl.protocol","TLSv1.2");
props.put("ssl.enabled.protocols","TLSv1.2");
String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD\";";
saslJaasConfig = saslJaasConfig.replace("USERNAME", user).replace("PASSWORD", password);
props.put("sasl.jaas.config",saslJaasConfig);

StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> Kstreams = builder.stream(myTopic);

KTable<String, Long> eventCount = Kstreams
        .flatMapValues(value -> getID(value)) //function that retrieves the ID
        .groupBy((key, value) -> value)
        .count();

When I run the code, I get the following error(s):

Exception in thread "KTableTest-e2062d11-0b30-4ed0-82b0-00d83dcd9366->StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Could not create topic KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition.

Followed by:

Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.PolicyViolationException: Invalid configuration: {segment.index.bytes=52428800, segment.bytes=52428800, cleanup.policy=delete, segment.ms=600000}. Only allowed configs: [retention.ms, cleanup.policy]

I have no idea why this error occurs, and what could be done about it. Is the way I have built the KStream and KTable incorrect somehow? Or perhaps the message hub on bluemix?

Solved:

Adding an extract from the comments below the answer I have marked as correct. Turned out my StreamsConfig was fine, and that there (for now) is an issue on Message Hub's side, but there is a workaround:

It turns out Message Hub has an issue when creating repartition topics with Kafka Streams 1.1. While we work on a fix, you'll need to create the topic KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition by hand. It needs as many partitions as your input topic (myTopic) and set the retention time to the max. I'll post another comment once it's fixed

Many thanks for the help!

Birdhouse answered 3/5, 2018 at 11:0 Comment(1)
Can you add your Kafka Streams application configuration (or properties file) ?Firing
F
5

Message Hub has some restrictions on the configurations that can be used when creating topics.

From the PolicyViolationException you received, it looks like your Streams application tried to use a few configs we don't allow:

  • segment.index.bytes
  • segment.bytes
  • segment.ms

I'm guessing you set those somewhere in your Streams configuration and they should be removed.

Note that you also need to set StreamsConfig.REPLICATION_FACTOR_CONFIG to 3 in your config to work with Message Hub as mentioned in our docs.

Firing answered 3/5, 2018 at 12:45 Comment(5)
Thank you for replying! I think you are definitely right. I added my streams configuration, but I think the necessities in order to make it work with Message Hub(MH) is already in place. At least according to the docs. What I do not understand is why I get "Could not create topic", when I am not trying to, unless a KTable counts as one? Could it be solved by allocating a topic on the MH? Like I wrote in the start of my question, I would like to count the occurrence of the IDs in the topic from MH, does it have to do more with MH than simply listen to the topic?Birdhouse
Yes your transformation logic will create "internal" topics, see kafka.apache.org/11/documentation/streams/developer-guide/…. You could pre-create them by hand beforehand but it's usually easier to let Streams do it. Otherwise, I think your logic looks fine.Firing
I see. So when I try to create a KTable, that is when I get the error since I am restricted from creating a topic in this way on Message Hub. Have I understood right if the code I have is retrieving the messages from a MH topic and try to create an internal topic(KTable) on MH? Is there a way alter my code to create the KTable elsewhere with Streams? Or do I need another Kafka server in order to achieve what I am after? Can MH handle KTables? Sorry for all the questions, I am really appreciating your help.Birdhouse
I managed to reproduce your issue. It turns out Message Hub has an issue when creating repartition topics with Kafka Streams 1.1. While we work on a fix, you'll need to create the topic KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition by hand. It needs as many partitions as your input topic (myTopic) and set the retention time to the max. I'll post another comment once it's fixedFiring
I completely forgot to post here, but this issue has been fixed a few weeks agoFiring

© 2022 - 2024 — McMap. All rights reserved.