I'm struggling with customization of my spring kafka streams application. I have been trying to configure handling uncaught (runtime exceptions) at my KStreams.
Refering to documentation https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_streams_binder - it should be done like that:
@Configuration
@Slf4j
public class CustomKafkaStreamsConfiguration {
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
log.error("An exception has occurred={}", e.getMessage()) ;
});
}
});
};
}
}
Later on I have a KStream
@Bean
public Function<KStream<String, Transaction>,
KStream<String, Transaction>> paymentExecution() {
return stream -> stream
.peek((k, v) -> {
if (v.getStatus().equals(PaymentStatus.UNKNOWN)) {
throw new IllegalStateException();
}
});
}
If I send Transaction with UNKNOWN status StreamThread dies due to IllegalStateException and then my KStream is not consuming anymore any incomming requests.
Exception in thread "payment-d4b6ddd2-40ab-4eeb-afe4-e7fc3caa2b9c-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=payment-request, partition=0, offset=13, stacktrace=java.lang.IllegalStateException
at payment.process.PaymentExecutionRequestProcessor.lambda$paymentExecution$4(PaymentExecutionRequestProcessor.java:48)
at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:696)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
Caused by: java.lang.IllegalStateException
at payment.process.PaymentExecutionRequestProcessor.lambda$paymentExecution$4(PaymentExecutionOrchestratorProcessor.java:48)
at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
Is there something that I miss? Or maybe there is more appropriate to handle runtime exceptions that rise while processing stream?
After runtime exception I want commit that exception-event, and then I want my StreamThread-1 to be able to still consume events.
UncaughtExcpetionHandler
which as you explained doesn't work in that way. So there is no any nice way to recover from these cases, the only way is to restart an application? Well... that sound like some kind of edge case :( – Shotgun