Example on handling processing exception in Spring Cloud Streams with Kafka Streams Binder and the functional style processor
Asked Answered
C

2

0

I am using Spring Cloud Streams with the Kafka Streams Binder, the functional style processor API and also multiple processors.

It's really cool to configure a processing application with multiple processors and multiple Kafka topics in this way and staying in the Spring Boot universe with /actuator, WebClient and so on. Actually I like it more than using plain Apache Kafka Streams.

BUT: I would like to integrate exception handling for exceptions occurring within the processors and sending these unprocessable messages to a DLQ. I have setup already DLQs for deserialization errors, but I found no good advice on achieving this besides sobychacko's answer on a similar question. But this is only a snippet! Does anybody have a more detailed example? I am asking this because the Spring Cloud Stream documentation on branching looks quite different.

Cozenage answered 29/9, 2021 at 17:16 Comment(0)
H
1

Glad to hear about your usage of Spring Cloud Stream with Kafka Streams.

The reference docs you mentioned are from an older release. Please navigate to the newer docs from this page: https://spring.io/projects/spring-cloud-stream#learn

This question has come up before. See if these could help with your use case:

Error handling in Spring Cloud Kafka Streams

How to stop sending to kafka topic when control goes to catch block Functional kafka spring

Haircloth answered 29/9, 2021 at 18:30 Comment(1)
I have checked the links. Thanks a lot for these! I will post my code below, if somebody needs it.Cozenage
C
1

I integrated the code from sobychako's links to my code. I only had to add org.springframework.cloud:spring-cloud-stream-binder-kafka as a dependency because of the StreamBridge. Up to now I had used only the org.springframework.cloud:spring-cloud-stream-binder-kafka-streams binder.

UPDATE: I have switched now from the .branch() solution of the links to the easier .filter() code now - see below.

My sample code for two processors working on 3 topics (with 2 runtime error topics and 2 derserialization error topics) is below. The DocumentedErrorOutput class combines key, message and stacktrace and is written to an error topic.

spring:
  application:
    name: scs-sample
  kafka:
    bootstrap-servers: 'localhost:9092'
    jaas:
      enabled: false
    security:
      protocol: PLAINTEXT
    properties:
      sasl:
        mechanism: PLAIN
  cloud:
    # which steps are performed?
    function:
      definition: 'process1;process2'
    stream:
      instance-count: 1 # for local development 1 (is Default)
      bindings:
        # PROCESSOR process1 - - - - - - - - - - - - - - - - - - - -
        process1-in-0:
          destination: ${application.topic.topic1}
          consumer: # consumer properties on each function (processor) level
            concurrency: 1 # See "2.19.4 Special note on concurrency" - translated to num.stream.thread
            ackEachRecord: true # commit the offset after each single record is processed
            standardHeaders: timestamp
        process1-out-0:
          destination: ${application.topic.topic2}
        process1-out-error:
          destination: ${application.topic.topic1Error}
        # PROCESSOR process2 - - - - - - - - - - - - - - - - - - - -
        process2-in-0:
          destination: ${application.topic.topic2}
          consumer: # consumer properties on each function (processor) level
            concurrency: 1 # See "2.19.4 Special note on concurrency" - translated to num.stream.thread
            ackEachRecord: true # commit the offset after each single record is processed
            standardHeaders: timestamp
        process2-out-0:
          destination: ${application.topic.topic3}
        process2-out-error:
          destination: ${application.topic.topic2Error}
      kafka:
        streams:
          binder:
            auto-create-topics: false # We do not want topic auto creation (is disabled in our cluster)
            functions:
              # if we use multiple processors, the application-id MUST BE SET (and unique in cluster)!
              process1:
                application-id: ${application.id.process1}
                # configuration:
                  # Define producer exception handler (can be done here on process level, but easier in Java)
                  # default.production.exception.handler: de.datev.pws.loon.dcp.exceptions.CustomProductionExceptionHandler
              process2:
                application-id: ${application.id.process2}
                # configuration:
                  # Define producer exception handler (can be done here on process level, but easier in Java)
                  # default.production.exception.handler: de.datev.pws.loon.dcp.exceptions.CustomProductionExceptionHandler
            # we use DLQs if the JSON cannot be deserialized
            deserialization-exception-handler: sendtodlq
            required-acks: -1  # all in-sync-replicas
            replication-factor: 1 # our replication factor in local dev mode
            configuration: # Standard Kafka Streams configuration properties (consumer and producer) on binder level
              commit.interval.ms: 1000 # the definition here at the binder level works
              default:
                # Define producer exception handler (can be done here globally, but easier in Java)
                # production.exception.handler: de.datev.pws.loon.dcp.exceptions.CustomProductionExceptionHandler
                key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
            # producer-properties: # Standard Kafka Streams configuration properties (producer only) on binder level
            # consumer-properties: # Standard Kafka Streams configuration properties (consumer only) on binder level
          bindings:
            process1-in-0:
              consumer: # consumer properties on each function's input level
                # Must be defined. Otherwise error.<input-topic-name>.<application-id> is used
                dlqName: ${application.topic.topic1Dlq}
            process1-out-0:
              producer: # producer properties on each function's output level
                streamPartitionerBeanName: streamPartitionerTopic2
            process2-in-0:
              consumer: # consumer properties on each function's input level
                # Must be defined. Otherwise error.<input-topic-name>.<application-id> is used
                dlqName: ${application.topic.topic2Dlq}
            process2-out-0:
              producer: # producer properties on each function's output) level
                streamPartitionerBeanName: streamPartitionerTopic3

management:
  endpoints:
    web:
      exposure:
        include: ['health', 'info', 'bindings', 'logfile', 'metrics', 'configprops', 'env', 'kafkastreamstopology']
  endpoint:
    health:
      show-details: ALWAYS
  health:
    binders:
      enabled: true # is default

application:
  topic:
    topic1: topic1
    topic1Error: topic1.err
    topic1Dlq: topic1.dlq
    topic2: topic2
    topic2Error: topic2.err
    topic2Dlq: topic2.dlq
    topic3: topic3
  id:
    # the id is used as the prefix for changelog (KTable) topics, so it has to be configurable
    process1: process1
    process2: process2
package de.datev.pws.loon.dcp.processor;

import de.datev.pws.loon.dcp.exceptions.DocumentedErrorOutput;
import de.datev.pws.loon.dcp.model.Message1;
import de.datev.pws.loon.dcp.model.Message2;
import de.datev.pws.loon.dcp.model.Message3;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

@Component
public class EventProcessor {

    private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class);

    private final StreamBridge streamBridge;
    private final Processor1 processor1;
    private final Processor2 processor2;

    public EventProcessor(StreamBridge streamBridge, Processor1 processor1, Processor2 processor2) {

        this.streamBridge = streamBridge;
        this.processor1 = processor1;
        this.processor2 = processor2;
    }

    @Bean
    public Function<KStream<String, Message1>, KStream<String, Message2>> process1() {

        final AtomicReference<KeyValue<String, Message2>> result = new AtomicReference<>(null);
        return input -> input
            .filter(
                (messageKey, messageValue) -> {
                    try { // Call the processor within try/catch
                        result.set(processor1.streamProcess(messageKey, messageValue));
                        return true; // We are done and signal with true, that the flow has an output
                    } catch (Exception exception) {
                        handleProcessingException("process1", messageKey, messageValue, exception);
                        return false; // // We have send the error and signal with false, that there is no output
                    }
                })
            .map((messageKey, messageValue) -> result.get());
    }

    @Bean
    public Function<KStream<String, Message2>, KStream<String, Message3>> process2() {

        final AtomicReference<KeyValue<String, Message3>> result = new AtomicReference<>(null);
        return input -> input
            .filter(
                (messageKey, messageValue) -> {
                    try { // Call the processor within try/catch
                        result.set(processor2.streamProcess(messageKey, messageValue));
                        return true;  // We are done and signal with true, that the flow has an output
                    } catch (Exception exception) {
                        handleProcessingException("process2", messageKey, messageValue, exception);
                        return false; // We have send the error and signal with false, that there is no output
                    }
                })
            .map((messageKey, messageValue) -> result.get());
    }

    protected void handleProcessingException(String processName, String messageKey, Object messageValue, Exception exception) {

        final DocumentedErrorOutput documentedErrorOutput = new DocumentedErrorOutput(messageKey, messageValue, exception);
        final Message<DocumentedErrorOutput> documentedErrorOutputMessage = MessageBuilder.withPayload(documentedErrorOutput).build();
        final String bindingName = processName + "-out-error";
        LOGGER.error(">>> EXCEPTION in process {} for message={}! Sending problem to out binding \"{}\".", processName, messageValue, bindingName, exception);
        streamBridge.send(bindingName, documentedErrorOutputMessage);
    }

    // Some samples for partitioning

    @Bean
    public StreamPartitioner<String, Message2> streamPartitionerTopic2() {

        LOGGER.info("Performing StreamPartitioner setup for topic2/Message2 using EventProcessor.streamPartitioner");
        return (topicName, key, value, totalPartitions) -> {
            final int partition = key != null ? Math.abs(key.hashCode()) % totalPartitions : 0;
            LOGGER.info(">>> streamPartitionerTopic2 topicName={} totalPartitions={} key={} partition={}", topicName, totalPartitions, key, partition);
            return partition;
        };
    }

    @Bean
    public StreamPartitioner<String, Message3> streamPartitionerTopic3() {

        LOGGER.info("Performing StreamPartitioner setup for topic3/Message3 using EventProcessor.streamPartitioner");
        return (topicName, key, value, totalPartitions) -> {
            final int partition = key != null ? Math.abs(key.hashCode()) % totalPartitions : 0;
            LOGGER.info(">>> streamPartitionerTopic3 topicName={} totalPartitions={} key={} partition={}", topicName, totalPartitions, key, partition);
            return partition;
        };
    }
}

UPDATE:

Example code of processor, that multiplies a value

@Component
public class Processor2 {

    public KeyValue<String, Message3> streamProcess(String key, Message2 message2) {

        Message3 message3 = Message3.builder()
            .requestId(message2.getRequestId())
            .startTime(message2.getStartTime())
            .calculatedValue2(message2.getCalculatedValue1() * 2)
            .build();
        return new KeyValue<>(key, message3);
    }
}
Cozenage answered 30/9, 2021 at 12:52 Comment(2)
do you have a link to the full implementation of the above solution? Like how you defined the process1 and process2 processors/functions?Jedthus
I have added an example of the processor functions in the answer, as requested by @JedthusCozenage

© 2022 - 2025 — McMap. All rights reserved.