apache-kafka-streams Questions

1

One of my KafkaStream apps gives me the following error: Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms It is not likely a permission or connectivity issue. Because it co...
Heparin asked 16/2, 2020 at 8:37

2

I'm currently evaluating options for designing/implementing Event Sourcing + CQRS architectural approach to system design. Since we want to use Apache Kafka for other aspects (normal pub-sub messag...

6

Solved

I have a basic stream processing flow which looks like master topic -> my processing in a mapper/filter -> output topics and I am wondering about the best way to handle "bad messages". Thi...
Ponton asked 8/3, 2017 at 8:49

2

I am planning on setting up a MySQL to Kafka flow, with the end goal being to schedule a process to recalculate a mongoDB document based on the changed data. This might involve directly patching th...
Enthral asked 6/7, 2021 at 15:59

3

I'm struggling with customization of my spring kafka streams application. I have been trying to configure handling uncaught (runtime exceptions) at my KStreams. Refering to documentation https://do...

3

Solved

I have the following configuration for my Kafka Stream application Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG,this.applicaionId); config.put(StreamsCo...

3

I read but I couldn't understand too much. I read that I can use KTable instead of log compaction. Or it has many more features. However, I could not find a good example in this regard. I also coul...
Pozzuoli asked 29/2, 2020 at 17:40

4

I am planning to use kafka for stream and messaging purpose. Our infrastructure is moving to java 11. So wanted to know does kafka support java 10 or java 11 ?. Thanks
Concuss asked 16/8, 2018 at 7:19

4

Solved

I am trying to create the simplest as possible hello world with Spring Cloud + Kafka Streams + Spring Boot 2. I realize I miss basic concepts. Basically, I understand that: 1 - I need to define a...

1

Solved

This is a simple, plain Kafka Streams application doing a simple record transformation configured using EXACTLY_ONCE_V2. configurationParameters.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, Strea...
Nevus asked 27/11, 2021 at 20:48

3

Solved

Is it possible to use KSQL to not only count entries of a specific column via GROUP BY but instead get an aggregate over all the entries that stream through the application? I'm searching for some...
Schiff asked 14/11, 2017 at 18:52

2

Solved

I am using Spring Cloud Streams with the Kafka Streams Binder, the functional style processor API and also multiple processors. It's really cool to configure a processing application with multiple ...

2

Solved

I have a stream that is a composite of other streams final KTable<Long, CompositeInfo> compositeInfoTable = compositeImcTable .leftJoin( compositeFundTable, (CompositeImc cimc, Composite...
Mulford asked 27/7, 2018 at 21:20

4

Solved

I am trying to control number of messages which are consumed by the KStream and I am not very succesful. I am using: max.poll.interval.ms=100 and max.poll.records=20 to get like 200 messages per ...
Watusi asked 29/6, 2017 at 6:23

1

Solved

I'm using Spring Cloud Stream with Kafka Streams. Let's say I have a processor which is a Function which converts a KStream of Strings to a KStream of CityProgrammes. It invokes an API to find the ...

2

we are starting to work with Kafka streams, our service is a very simple stateless consumer. We have tight requirements on latency, and we are facing too high latency problems when the consumer gr...
Emulate asked 16/1, 2019 at 14:5

1

Getting exception while reading from kafka topic: Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1 Caused by: org.apache.kafka.common...

1

We have a situation in which I think Kafka Streams could help, but I cannot find any documentation or examples that show how. There is one similar question I found, but it does not have any implem...
Elaina asked 5/3, 2018 at 21:0

2

Solved

Is there some explicit cleanup needed to prevent each persistent store from growing too much in size? I am currently using it for calculating aggregations in DSL API.
Pottage asked 13/12, 2018 at 20:50

1

I just upgraded our Kafka Stream application from 2.5.1 to 2.6.2. It used to work, now it doesn't. Here is the troublesome topology (I have omitted the irrelevant Serdes): val builder = new Streams...
Scherzo asked 15/5, 2021 at 11:29

1

Solved

I am using kafka version 2.4.1(recently upgraded to 2.4.1 from 2.2.0) and noticed a strange problem. Even though application(kafka streams) is down (there is no application which is running ) but...
Major asked 30/4, 2020 at 14:48

2

I am running a Kafka Stream app with Springboot 2. I would like to have my kafka stream metrics available in the prometheus format at host:8080/actuator/prometheus I don't manage to have this. I ...
Skeg asked 24/9, 2019 at 16:6

4

Solved

Had gone through multiple posts but most of them are related handling Bad messages not about exception handling while processing them. I want to know to how to handle the messages that is been rece...
Agglutinogen asked 12/7, 2018 at 7:21

2

Solved

I have a stream app with below driver code for real-time message transformation. String topicName = ... KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> source = builde...
Skunk asked 22/2, 2017 at 17:24

3

Solved

I recently started learning Kafka and end up with these questions. What is the difference between Consumer and Stream? For me, if any tool/application consume messages from Kafka is a consumer i...
Oracular asked 17/5, 2017 at 3:23

© 2022 - 2025 — McMap. All rights reserved.