This can be implemented using the following pattern:
KafkaClass stream;
return input -> input
.branch((k, v) -> {
try {
stream = processFunction();
return true;
}
catch (Exception e) {
Message<KakfaClass> mess = MessageBuilder.withPayload(value).build();
streamBridge.send("errProcess-out-0". mess);
return false;
}
},
(k, v) -> true)[0]
.map((k, v) -> new KeyValue<>(k, stream));
Here, we are using the branching feature (API) of KStream
to split your input into two paths - normal flow and the one causing the errors. This is accomplished by providing two filters to the branch
method call. The first filter is the normal flow in which you call the processFunction
method and get a response back. If we don't get an exception, the filter returns true
, and the result of the branch operation is captured in the first element of the output array [0]
which is processed downstream in the map
operation in which it sends the final result to the outbound topic.
On the other hand, if it throws an exception, it sends whatever is necessary to the error topic using StreamBridge
and the filter returns false
. Since the downstream map
operation is only performed on the first element of the array from branching [0]
, nothing will be sent outbound. When the first filter returns false
, it goes to the second filter which always returns true
. This is a no-op filter where the results are completely ignored.
One downside of this particular implementation is that you need to store the response from processFunction
in an instance field and then mutate on each incoming KStream
record so that you can access its value in the final map
method where you send the output. However, for this particular use case, this may not be an issue.