I see that following annotations are depreciated for Spring Cloud Stream
@Input
@Output
@EnableBinding
@StreamListener
Please provide examples and links to documentation as how to do it in functional way.
I see that following annotations are depreciated for Spring Cloud Stream
@Input
@Output
@EnableBinding
@StreamListener
Please provide examples and links to documentation as how to do it in functional way.
This github repository contains a lot of examples..
https://github.com/spring-cloud/stream-applications
The official documentation explains in details how to move from imperative to functional style in spring cloud stream applications with kafka streams but is the same without it.
Please also check this post..
https://spring.io/blog/2019/10/14/spring-cloud-stream-demystified-and-simplified
There is an example of imperative code (https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html#_imperative_programming_model) and how it should be developed with functional style.
Instead of working with annotation-based configuration, spring now uses detected beans of Consumer/Function/Supplier to define your streams for you. Older version the code with annotation looks like below:
interface InputChannels {
@Input("input")
SubscribableChannel input();
}
@EnableBinding(InputChannels.class)
public class PubSubDemo {
@StreamListener("input")
public void listen() {
if (LOG.isInfoEnabled()) {
LOG.info(context.toString());
}
}
New version code will be like :
public class PubSubDemo {
@Bean
Consumer<String> input() {
return str -> {
if (LOG.isInfoEnabled()) {
LOG.info(context.toString());
}
};
}
}
Check Consumer bean replaced the @StreamListener and the @Input.
Regarding the configuration, if before in order to configure you had an application.yml looking like so:
spring:
cloud:
stream:
bindings:
input:
destination: destination
group: group
consumer:
concurrency: 10
max-attempts: 3
Now new configuration will be like
spring:
cloud:
stream:
bindings:
input-in-0:
destination: destination
group: group
consumer:
concurrency: 10
max-attempts: 3
Optional: Spring cloud stream lib creates the bean by its own. If the bean is not created. use below properties.
spring:
cloud:
stream:
function:
definition: input
The in and out corresponds to the type of binding (such as input or output). The index is the index of the input or output binding. It is always 0 for typical single input/output function.
Now let consider Output channels:
public interface OutputChannels {
@Output
MessageChannel output();
}
@Service
@EnableBinding(OutputChannels.class)
class PubSubSendQueue {
OutputChannels outputChannel;
public void publish() {
outputChannel.output().send("Hello");
}
}
Now with the Functions code will be as :
@Service
class PubSubSendQueue {
@Bean
public Supplier<String> output(){
return Supplier { "Adam" }
}
}
Add below in application.properties file
spring.cloud.stream.bindings.output-out-0.destination=destination
This github repository contains a lot of examples..
https://github.com/spring-cloud/stream-applications
The official documentation explains in details how to move from imperative to functional style in spring cloud stream applications with kafka streams but is the same without it.
Please also check this post..
https://spring.io/blog/2019/10/14/spring-cloud-stream-demystified-and-simplified
There is an example of imperative code (https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html#_imperative_programming_model) and how it should be developed with functional style.
Chapter 2: Introduction to Spring Boot - Code examples for sending and receiving messages
& Chapter 7: Developing Reactive Microservices - Developing event-driven asynchronous services
. Which shows something like: spring.cloud.function: definition: myPublisher;myProcessor;mySubscriber
spring.cloud.stream: bindings.messageProcessor-in-0: destination: products
(Just a practical ex, maybe not much explanation in it.) –
Kalakalaazar Here some more helpful information:
Sending a message
Use the org.springframework.cloud.stream.function.StreamBridge for sending messages.
Before
myDataSource.output().send(message);
After
streamBridge.send("myData-out-0", message);
Replacing a ServiceActivator
Before
@ServiceActivator(inputChannel = MyProcessor.INPUT, outputChannel = MyProcessor.OUTPUT)
public Message<MySuperOutputMessage> transform(Message<MySuperInputMessage> message) { ... }
After
@Bean
Function<Message<MySuperInputMessage>, Message<MySuperOutputMessage>> myCoolFunction() {
return message -> {...};
}
Do not forget to register "myCoolFunction" in the properties spring.cloud.function.definition.
Until version v2.1.0, SCS (Spring Cloud Stream) worked as annotation-based programing (just as the previous guide).
From this version, SCS preferred way of work is using the function based.
Both are valid and fully functioning implementations.
Both do the same thing and both produce the same result – except that, in the annotation-based, the user has to be aware of SCS abstractions (that is, messaging, channels, binding, and so on) while the actual user code has nothing to do with any of them.
Instead of working with annotation-based configuration, spring now uses detected beans of Consumer/Function/Supplier to define your streams for you.
I wrote blog & guide about Spring Cloud Stream Functional Programming: https://shaikezam.com/article/spring_cloud_stream_functional
© 2022 - 2025 — McMap. All rights reserved.
Chapter 2: Introduction to Spring Boot - Code examples for sending and receiving messages
&Chapter 7: Developing Reactive Microservices - Developing event-driven asynchronous services
. Which shows something like:spring.cloud.function: definition: myPublisher;myProcessor;mySubscriber
spring.cloud.stream: bindings.messageProcessor-in-0: destination: products
(Just a practical ex, maybe not much explanation in it.) – Kalakalaazar