How to stop sending to kafka topic when control goes to catch block Functional kafka spring
C

1

1

could you please advise , how can I stop sending to my 3rd kafka topic, when the control reaches the catch block, currently the message is sent to both error topic as well as the topic to which it should send in case of normal processing. A snippet of code is as below:

@Component
public class Abc {
private final StreamBridge streamBridge;
public Abc (StreamBridge streamBridge)
this.streamBridge = streamBridge;
@Bean
public Function<KStream<String, KafkaClass>, KStream<String,KafkaClass>> hiProcess() {
return input -> input.map((key,value) -> {
try{
KafkaClass stream = processFunction();
}
catch(Exception e) {
Message<KakfaClass> mess = MessageBuilder.withPayload(value).build();
streamBridge.send("errProcess-out-0". mess);
}
return new KeyValue<>(key, stream);
})
}
}

    
Classicize answered 22/3, 2021 at 14:59 Comment(0)
F
5

This can be implemented using the following pattern:

KafkaClass stream;
return input -> input
    .branch((k, v) -> {
        try {
            stream = processFunction();
            return true;
        }
        catch (Exception e) {
            Message<KakfaClass> mess = MessageBuilder.withPayload(value).build();
            streamBridge.send("errProcess-out-0". mess);
            return false;
        }
       },
       (k, v) -> true)[0]
    .map((k, v) -> new KeyValue<>(k, stream));

Here, we are using the branching feature (API) of KStream to split your input into two paths - normal flow and the one causing the errors. This is accomplished by providing two filters to the branch method call. The first filter is the normal flow in which you call the processFunction method and get a response back. If we don't get an exception, the filter returns true, and the result of the branch operation is captured in the first element of the output array [0] which is processed downstream in the map operation in which it sends the final result to the outbound topic.

On the other hand, if it throws an exception, it sends whatever is necessary to the error topic using StreamBridge and the filter returns false. Since the downstream map operation is only performed on the first element of the array from branching [0], nothing will be sent outbound. When the first filter returns false, it goes to the second filter which always returns true. This is a no-op filter where the results are completely ignored.

One downside of this particular implementation is that you need to store the response from processFunction in an instance field and then mutate on each incoming KStream record so that you can access its value in the final map method where you send the output. However, for this particular use case, this may not be an issue.

Fariss answered 22/3, 2021 at 16:8 Comment(4)
This is great. Thank you v much!!Classicize
@sobychacko, i am using the same code as above in Kafka 3.0.0 and getting "Broker may not be available".Avaavadavat
Could you create a minimal-sample so we can verify?Fariss
@sobychacko, i am able to resolve this problem by configuring the Spring producer settings ie. cloud.stream.kafka.binder.configuration etc. is there any better way we can send the error message to error Topic in Spring cloud stream? ie ProcessorAvaavadavat

© 2022 - 2025 — McMap. All rights reserved.