I have implemented kafka stream application. Let's say one of the object's field which the stream is currently processing contains a number instead of a string value. Currently when there is an exception thrown in the processing logic eg. .transform()
method, whole stream is killed and my application stops to process data.
I would like to skip such invalid record and keep processing next records available on a input topic. Additionally I don't want to implement any try-catch statements in my stream processing code.
To achieve this, I implemented StreamsUncaughtExceptionHandler
so it returns StreamThreadExceptionResponse.REPLACE_THREAD
enum in order to spawn new thread and keep on processing next records waiting on the input topic. However, it turned out that the stream consumer offset is not committed and when new a thread is started, it takes old record which just have killed the previous stream thread... Since the logic is the same, new thread will also fail to process the error record and again fail. Some kind of a loop spawning new thread and failing on a same record every time.
Is there any clean way of skipping failing record and keep the stream processing next records?
Please note, I am not asking about DeserializationExceptionHandler
or ProductionExceptionHandler
.