I integrated the code from sobychako's links to my code. I only had to add org.springframework.cloud:spring-cloud-stream-binder-kafka
as a dependency because of the StreamBridge. Up to now I had used only the org.springframework.cloud:spring-cloud-stream-binder-kafka-streams
binder.
UPDATE:
I have switched now from the .branch()
solution of the links to the easier .filter()
code now - see below.
My sample code for two processors working on 3 topics (with 2 runtime error topics and 2 derserialization error topics) is below. The DocumentedErrorOutput
class combines key, message and stacktrace and is written to an error topic.
spring:
application:
name: scs-sample
kafka:
bootstrap-servers: 'localhost:9092'
jaas:
enabled: false
security:
protocol: PLAINTEXT
properties:
sasl:
mechanism: PLAIN
cloud:
# which steps are performed?
function:
definition: 'process1;process2'
stream:
instance-count: 1 # for local development 1 (is Default)
bindings:
# PROCESSOR process1 - - - - - - - - - - - - - - - - - - - -
process1-in-0:
destination: ${application.topic.topic1}
consumer: # consumer properties on each function (processor) level
concurrency: 1 # See "2.19.4 Special note on concurrency" - translated to num.stream.thread
ackEachRecord: true # commit the offset after each single record is processed
standardHeaders: timestamp
process1-out-0:
destination: ${application.topic.topic2}
process1-out-error:
destination: ${application.topic.topic1Error}
# PROCESSOR process2 - - - - - - - - - - - - - - - - - - - -
process2-in-0:
destination: ${application.topic.topic2}
consumer: # consumer properties on each function (processor) level
concurrency: 1 # See "2.19.4 Special note on concurrency" - translated to num.stream.thread
ackEachRecord: true # commit the offset after each single record is processed
standardHeaders: timestamp
process2-out-0:
destination: ${application.topic.topic3}
process2-out-error:
destination: ${application.topic.topic2Error}
kafka:
streams:
binder:
auto-create-topics: false # We do not want topic auto creation (is disabled in our cluster)
functions:
# if we use multiple processors, the application-id MUST BE SET (and unique in cluster)!
process1:
application-id: ${application.id.process1}
# configuration:
# Define producer exception handler (can be done here on process level, but easier in Java)
# default.production.exception.handler: de.datev.pws.loon.dcp.exceptions.CustomProductionExceptionHandler
process2:
application-id: ${application.id.process2}
# configuration:
# Define producer exception handler (can be done here on process level, but easier in Java)
# default.production.exception.handler: de.datev.pws.loon.dcp.exceptions.CustomProductionExceptionHandler
# we use DLQs if the JSON cannot be deserialized
deserialization-exception-handler: sendtodlq
required-acks: -1 # all in-sync-replicas
replication-factor: 1 # our replication factor in local dev mode
configuration: # Standard Kafka Streams configuration properties (consumer and producer) on binder level
commit.interval.ms: 1000 # the definition here at the binder level works
default:
# Define producer exception handler (can be done here globally, but easier in Java)
# production.exception.handler: de.datev.pws.loon.dcp.exceptions.CustomProductionExceptionHandler
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
# producer-properties: # Standard Kafka Streams configuration properties (producer only) on binder level
# consumer-properties: # Standard Kafka Streams configuration properties (consumer only) on binder level
bindings:
process1-in-0:
consumer: # consumer properties on each function's input level
# Must be defined. Otherwise error.<input-topic-name>.<application-id> is used
dlqName: ${application.topic.topic1Dlq}
process1-out-0:
producer: # producer properties on each function's output level
streamPartitionerBeanName: streamPartitionerTopic2
process2-in-0:
consumer: # consumer properties on each function's input level
# Must be defined. Otherwise error.<input-topic-name>.<application-id> is used
dlqName: ${application.topic.topic2Dlq}
process2-out-0:
producer: # producer properties on each function's output) level
streamPartitionerBeanName: streamPartitionerTopic3
management:
endpoints:
web:
exposure:
include: ['health', 'info', 'bindings', 'logfile', 'metrics', 'configprops', 'env', 'kafkastreamstopology']
endpoint:
health:
show-details: ALWAYS
health:
binders:
enabled: true # is default
application:
topic:
topic1: topic1
topic1Error: topic1.err
topic1Dlq: topic1.dlq
topic2: topic2
topic2Error: topic2.err
topic2Dlq: topic2.dlq
topic3: topic3
id:
# the id is used as the prefix for changelog (KTable) topics, so it has to be configurable
process1: process1
process2: process2
package de.datev.pws.loon.dcp.processor;
import de.datev.pws.loon.dcp.exceptions.DocumentedErrorOutput;
import de.datev.pws.loon.dcp.model.Message1;
import de.datev.pws.loon.dcp.model.Message2;
import de.datev.pws.loon.dcp.model.Message3;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@Component
public class EventProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class);
private final StreamBridge streamBridge;
private final Processor1 processor1;
private final Processor2 processor2;
public EventProcessor(StreamBridge streamBridge, Processor1 processor1, Processor2 processor2) {
this.streamBridge = streamBridge;
this.processor1 = processor1;
this.processor2 = processor2;
}
@Bean
public Function<KStream<String, Message1>, KStream<String, Message2>> process1() {
final AtomicReference<KeyValue<String, Message2>> result = new AtomicReference<>(null);
return input -> input
.filter(
(messageKey, messageValue) -> {
try { // Call the processor within try/catch
result.set(processor1.streamProcess(messageKey, messageValue));
return true; // We are done and signal with true, that the flow has an output
} catch (Exception exception) {
handleProcessingException("process1", messageKey, messageValue, exception);
return false; // // We have send the error and signal with false, that there is no output
}
})
.map((messageKey, messageValue) -> result.get());
}
@Bean
public Function<KStream<String, Message2>, KStream<String, Message3>> process2() {
final AtomicReference<KeyValue<String, Message3>> result = new AtomicReference<>(null);
return input -> input
.filter(
(messageKey, messageValue) -> {
try { // Call the processor within try/catch
result.set(processor2.streamProcess(messageKey, messageValue));
return true; // We are done and signal with true, that the flow has an output
} catch (Exception exception) {
handleProcessingException("process2", messageKey, messageValue, exception);
return false; // We have send the error and signal with false, that there is no output
}
})
.map((messageKey, messageValue) -> result.get());
}
protected void handleProcessingException(String processName, String messageKey, Object messageValue, Exception exception) {
final DocumentedErrorOutput documentedErrorOutput = new DocumentedErrorOutput(messageKey, messageValue, exception);
final Message<DocumentedErrorOutput> documentedErrorOutputMessage = MessageBuilder.withPayload(documentedErrorOutput).build();
final String bindingName = processName + "-out-error";
LOGGER.error(">>> EXCEPTION in process {} for message={}! Sending problem to out binding \"{}\".", processName, messageValue, bindingName, exception);
streamBridge.send(bindingName, documentedErrorOutputMessage);
}
// Some samples for partitioning
@Bean
public StreamPartitioner<String, Message2> streamPartitionerTopic2() {
LOGGER.info("Performing StreamPartitioner setup for topic2/Message2 using EventProcessor.streamPartitioner");
return (topicName, key, value, totalPartitions) -> {
final int partition = key != null ? Math.abs(key.hashCode()) % totalPartitions : 0;
LOGGER.info(">>> streamPartitionerTopic2 topicName={} totalPartitions={} key={} partition={}", topicName, totalPartitions, key, partition);
return partition;
};
}
@Bean
public StreamPartitioner<String, Message3> streamPartitionerTopic3() {
LOGGER.info("Performing StreamPartitioner setup for topic3/Message3 using EventProcessor.streamPartitioner");
return (topicName, key, value, totalPartitions) -> {
final int partition = key != null ? Math.abs(key.hashCode()) % totalPartitions : 0;
LOGGER.info(">>> streamPartitionerTopic3 topicName={} totalPartitions={} key={} partition={}", topicName, totalPartitions, key, partition);
return partition;
};
}
}
UPDATE:
Example code of processor, that multiplies a value
@Component
public class Processor2 {
public KeyValue<String, Message3> streamProcess(String key, Message2 message2) {
Message3 message3 = Message3.builder()
.requestId(message2.getRequestId())
.startTime(message2.getStartTime())
.calculatedValue2(message2.getCalculatedValue1() * 2)
.build();
return new KeyValue<>(key, message3);
}
}