How to skip record which produces runtime exception in Kafka and keep stream running?
Asked Answered
S

3

6

I have implemented kafka stream application. Let's say one of the object's field which the stream is currently processing contains a number instead of a string value. Currently when there is an exception thrown in the processing logic eg. .transform() method, whole stream is killed and my application stops to process data.

I would like to skip such invalid record and keep processing next records available on a input topic. Additionally I don't want to implement any try-catch statements in my stream processing code.

To achieve this, I implemented StreamsUncaughtExceptionHandler so it returns StreamThreadExceptionResponse.REPLACE_THREAD enum in order to spawn new thread and keep on processing next records waiting on the input topic. However, it turned out that the stream consumer offset is not committed and when new a thread is started, it takes old record which just have killed the previous stream thread... Since the logic is the same, new thread will also fail to process the error record and again fail. Some kind of a loop spawning new thread and failing on a same record every time.

Is there any clean way of skipping failing record and keep the stream processing next records?

Please note, I am not asking about DeserializationExceptionHandler or ProductionExceptionHandler.

Sundberg answered 27/1, 2022 at 17:25 Comment(2)
I recommend a try-catch block if possible which is more clean than doing expensive operations which will create additional overhead in cluster due to rebalance.Finite
@Sundberg Have you found an acceptable solution?Wooer
D
2

When it comes to the application-level code, it is mostly up to the application how the exception is handled. This use case has come up before. See these previous Stack Overflow threads.

Example on handling processing exception in Spring Cloud Streams with Kafka Streams Binder and the functional style processor

How to stop sending to kafka topic when control goes to catch block Functional kafka spring

Try to see if those answers can be applied to your scenario.

Despondent answered 27/1, 2022 at 22:42 Comment(0)
C
0

You can filter the event that dont match a pattern or validate the events before you transform them

Conventionalism answered 27/1, 2022 at 20:48 Comment(1)
I don't want to do that. I don't want to add any additional validation logic or checking patterns, because I can't predict all edge cases. I just want to skip any record which will cause any exception in the stream, simple as that.Sundberg
E
0

There are other ways then using try-catch block or filtering You can use Retry-Template from Spring Cloud Stream (Framework creates spring retryTemplate for each binding)

This gives option to swallow exception, raise exception(throwable) to DLQ

Example code below -->

**return stream -> stream.mapValues(v -> retryTemplate.execute(
                //retry-part
                r -> {
                   //Business Logic goes here
                },
                //recover context
                c -> {
                    //logging exception
                    log.info(c.getLastThrowable());
                    return null;
                }));**

This doesnt close-down stream You can add custom Retry Template to change defaults of spring-retry

Other option is to use lower level Processor-API (process, or transform) which will give access to swallow exception in case it occurs

Would like @sobychacko to review it if possible

Eisenhart answered 31/5, 2023 at 14:45 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.