How to debug kafkastreams code?
Asked Answered
J

3

10

Has anyone managed to debug kafkastreams code written in Java 8 using IntelliJ IDEA?. I am running a simple linesplit.java code where it takes stream from one topic and splits it and sends it to another topic, but I have no idea where to keep the debug pointer to debug every message as it flows through linesplit.java.

Linesplit.java

public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    final StreamsBuilder builder = new StreamsBuilder();



    // ------- use the code below for Java 8 and uncomment the above ---

    builder.stream("streams-input")
           .flatMapValues(value -> Arrays.asList(value.toString().split("\\W+")))
           .to("streams-output");

     //  -----------------------------------------------------------------

    final Topology topology = builder.build();
    final KafkaStreams streams = new KafkaStreams(topology, props);
    final CountDownLatch latch = new CountDownLatch(1);

    // attach shutdown handler to catch control-c
    Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
        @Override
        public void run() {
            streams.close();
            latch.countDown();
        }
    });

    try {
        streams.start();
        latch.await();
    } catch (Throwable e) {
        System.exit(1);
    }
    System.exit(0);
}
Joris answered 20/6, 2018 at 18:6 Comment(5)
Not sure. Maybe move value -> Arrays.asList(value.toString().split("\\W+")) (or part of it) into its own line?Elizebethelizondo
Well the problem is the builder logic gets sent to kafkastreams and there seems to be no way to debug live while sending a message through streams-input topic.Joris
If the break point is set with in callback, it should work. Otherwise, try to set a breakpoint in KStreamFlatMapProcessor class (that is part of Kafka Streams library)Elizebethelizondo
@MatthiasJ.Sax Setting a breakpoint in KStreamFlatMapProcessor did not work for me. I am running the UserRegionLambdaExample from this confluent github repo: https://github.com/confluentinc/kafka-streams-examplesAurlie
Hard to say... -- If you pass in a lambda, you should actually be able to set the breakpoint on the lambda -- just make sure that it does not get set "outside" (ie, the builder code) so the breakpoint hits at runtime.Elizebethelizondo
A
3

Did you try peek?

Your example can be as follows (using peek function):

builder
       .stream("streams-input")
       .peek((k, v) -> log.info("Observed event: {}", v))
       .flatMapValues(value -> Arrays.asList(value.toString().split("\\W+"))).
       .peek((k, v) -> log.info("Transformed event: {}", v))
       .to("streams-output");

I didn't run the code but this is how I do it normally.

Arris answered 18/1, 2022 at 23:13 Comment(1)
I didn't run the code my self, please let me know if it works for you or if I should modify itArris
P
0

FOr those kind of functions, IntelliJ propose to do an inner breakpoint,

Let me show you an example :

[![How to set an inner breakpoint on lambda][1]] [1]: https://i.sstatic.net/m7d7b.png

Peculiarize answered 16/6, 2022 at 14:17 Comment(0)
W
0

Breakpoint should be on StreamThread/StreamTask

Wellington answered 22/10, 2023 at 10:51 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.