Spring Cloud Stream Kafka Stream - How to handle runtime exceptions?
Asked Answered
S

3

1

I'm struggling with customization of my spring kafka streams application. I have been trying to configure handling uncaught (runtime exceptions) at my KStreams.

Refering to documentation https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_streams_binder - it should be done like that:

@Configuration
@Slf4j
public class CustomKafkaStreamsConfiguration {

    @Bean
    public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
        return factoryBean -> {
            factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
                @Override
                public void customize(KafkaStreams kafkaStreams) {
                    kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
                        log.error("An exception has occurred={}", e.getMessage()) ;
                    });

                }
            });
        };
    }

}

Later on I have a KStream

    @Bean
    public Function<KStream<String, Transaction>,
            KStream<String, Transaction>> paymentExecution() {
        return stream -> stream
                .peek((k, v) -> {
                    if (v.getStatus().equals(PaymentStatus.UNKNOWN)) {
                        throw new IllegalStateException();
                    }
                });
    }

If I send Transaction with UNKNOWN status StreamThread dies due to IllegalStateException and then my KStream is not consuming anymore any incomming requests.

Exception in thread "payment-d4b6ddd2-40ab-4eeb-afe4-e7fc3caa2b9c-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=payment-request, partition=0, offset=13, stacktrace=java.lang.IllegalStateException
    at payment.process.PaymentExecutionRequestProcessor.lambda$paymentExecution$4(PaymentExecutionRequestProcessor.java:48)
    at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)

    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:696)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
Caused by: java.lang.IllegalStateException
    at payment.process.PaymentExecutionRequestProcessor.lambda$paymentExecution$4(PaymentExecutionOrchestratorProcessor.java:48)
    at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)

Is there something that I miss? Or maybe there is more appropriate to handle runtime exceptions that rise while processing stream?

After runtime exception I want commit that exception-event, and then I want my StreamThread-1 to be able to still consume events.

Shotgun answered 14/1, 2021 at 9:27 Comment(0)
P
1

I believe you can modify your setUncaughtExceptionHandler to be something like this

(throwable -> {
 log.error("An exception has occurred={}", e.getMessage()) ;
 return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD; });

Essentially, the REPLACE_THREAD should close out the current thread and NOT kill the app/stream and continue on with the stream. Please see this post for more detail https://mcmap.net/q/1454594/-correct-way-to-restart-or-shutdown-the-stream-using-uncaughtexceptionhandler

Periscope answered 28/1, 2022 at 20:26 Comment(0)
E
0

If the application reaches the UncaughtExcpetionHandler, then the stream thread is already stopped and too late to recover. It won't continue processing. It is provided for you to take any graceful shutdown measures before exiting the application. You need to restart the application in order to continue the processing. You can probably look into using something like the branching feature in Kafka Streams to ignore the records with unknown status, something like below (I haven't tested this, but should work). The idea is to create a branch for the incoming KStream to filter out any records with this UNKNOWN status to be ignored rather than throwing an exception. It is still possible to log the unknown ones and provide any additional logic etc.

@Bean
    public Function<KStream<String, Transaction>,
            KStream<String, Transaction>[]> paymentExecution() {
        return stream -> stream
                .branch( (k, v) - !v.getStatus().equals(PaymentStatus.UNKNOWN));
    }
Elude answered 15/1, 2021 at 20:42 Comment(2)
So, there is no any elegant way to recover from exception? I mean runtime exceptions can occur by developer mistakes or whatever. Example with IllegalStateException was feigned, just to test he behavior of UncaughtExcpetionHandler which as you explained doesn't work in that way. So there is no any nice way to recover from these cases, the only way is to restart an application? Well... that sound like some kind of edge case :(Shotgun
Also, could you refer in some free time to my other question regarding logging incomming data? #65684290Shotgun
H
0

You can send the failures to a DLQ or configure a RetryTemplate to handle the exceptions as shown in https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.0.10.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#_error_handling

Hest answered 25/6, 2021 at 7:49 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.