Error handling in Spring Cloud Kafka Streams
Asked Answered
E

1

2

I'm using Spring Cloud Stream with Kafka Streams. Let's say I have a processor which is a Function which converts a KStream of Strings to a KStream of CityProgrammes. It invokes an API to find the City by name and an other transformation which finds any events near that city.

Now the problem is that any error happens during the transformation, the whole application stops. I want to send that one particular message to a DLQ and move along. I've been reading for days and everyone suggests to handle errors within the called services but that is a nonesense in my opinion, plus I still need to return a KStream: how do I do that within a catch?

I also looked at UncaughtExeptionHandler but it is not aware of the message and only able to restart the processing which won't skip this invalid message.

This might sound like an A-B problem so the question rephrased: how do I maintain the flow in a KStream when an exception occurs and send the invalid item to the DLQ?

Erichericha answered 25/8, 2021 at 21:17 Comment(0)
C
1

When it comes to the application-level errors you have, it is up to the application itself how the error is handled. Kafka Streams and the Spring Cloud Stream binder mainly support deserialization and serialization errors at the framework level. Although that is the case, I think your scenario can be handled. If you are using Kafka Client prior to 2.8, here is an SO answer I gave before on something similar: https://mcmap.net/q/1916093/-how-to-stop-sending-to-kafka-topic-when-control-goes-to-catch-block-functional-kafka-spring

If you are using Kafka/Streams 2.8, here is an idea that you can use. However, the code below should only be used as a starting point. Adjust it according to your use case. Read more on how branching works in Kafka Streams 2.8. The branching API is significantly refactored in 2.8 from the prior versions.

public Function<KStream<?, String>, KStream<?, Foo>> convert() {
            Foo[] foo = new Foo[0];
            return input -> {
                final Map<String, ? extends KStream<?, String>> branches =
                        input.split(Named.as("foo-")).branch((key, value) -> {
                                    try {
                                        foo[0] = new Foo(); // your API call for CitiProgramme converion here, possibly.
                                        return true;
                                    }
                                    catch (Exception e) {
                                        Message<?> message = MessageBuilder.withPayload(value).build();
                                        streamBridge.send("to-my-dlt", message);
                                        return false;
                                    }

                                }, Branched.as("bar"))
                                .defaultBranch();

                final KStream<?, String> kStream = branches.get("foo-bar");
                return kStream.map((key, value) -> new KeyValue<>("", foo[0]));
            };

        }


    }

The default branch is ignored in this code because that only contains the records that threw exceptions. Those were handled by the catch statement above in which we send the records to a DLT programmatically. Finally, we get the good records and map them to a new KStream and send it through the outbound.

Cosmism answered 25/8, 2021 at 22:4 Comment(5)
This answer is a couple of years old now, so just wanted to check whether exception handling has moved on at all? That application errors needs to be handled within the specific function? We have a stream that requires various filter and map functions and so the exception handling needs to be handled within each one of these. Is this correct?Undergrown
That is still the case with Kafka Streams. It is completely up to the application code how the errors from business logic needs to be handled.Cosmism
@Cosmism if this still the case another 2 years later? I just posted a question elsewhere and it seems like having a DLQ for KeyValueMapper/Transformers/Processors would be a useful addition and allow for cleaner streams code.Constriction
Could you create an issue in github.com/spring-cloud/spring-cloud-stream/issues and elaborate on this use case? We can certainly look into it then. Thanks!Cosmism
Published here! github.com/spring-cloud/spring-cloud-stream/issues/2779 Let me know if I did a bad job explaining myself. The following SO question is around whether or not the issue I linked already exists and I just can't locate it: #76696694Constriction

© 2022 - 2025 — McMap. All rights reserved.