@EnableBinding @deprecated as of 3.1 in favor of functional programming model
Asked Answered
H

4

29

I see that following annotations are depreciated for Spring Cloud Stream

@Input @Output @EnableBinding @StreamListener

Please provide examples and links to documentation as how to do it in functional way.

Holster answered 5/1, 2021 at 9:15 Comment(0)
M
15

This github repository contains a lot of examples..
https://github.com/spring-cloud/stream-applications

The official documentation explains in details how to move from imperative to functional style in spring cloud stream applications with kafka streams but is the same without it.

https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#spring_cloud_function

Please also check this post..
https://spring.io/blog/2019/10/14/spring-cloud-stream-demystified-and-simplified

https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html#_programming_model

There is an example of imperative code (https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html#_imperative_programming_model) and how it should be developed with functional style.

Methylene answered 5/1, 2021 at 11:2 Comment(1)
There is also an example in Microservices with Spring Boot and Spring Cloud: Chapter 2: Introduction to Spring Boot - Code examples for sending and receiving messages & Chapter 7: Developing Reactive Microservices - Developing event-driven asynchronous services. Which shows something like: spring.cloud.function: definition: myPublisher;myProcessor;mySubscriber spring.cloud.stream: bindings.messageProcessor-in-0: destination: products (Just a practical ex, maybe not much explanation in it.)Kalakalaazar
S
33

Instead of working with annotation-based configuration, spring now uses detected beans of Consumer/Function/Supplier to define your streams for you. Older version the code with annotation looks like below:

interface InputChannels {
    
        @Input("input")
        SubscribableChannel input();
    }

    @EnableBinding(InputChannels.class)
    public class PubSubDemo {
    @StreamListener("input")
    public void listen() {
            
        if (LOG.isInfoEnabled()) {
            LOG.info(context.toString());
        }
    }

New version code will be like :

public class PubSubDemo {
   @Bean
   Consumer<String> input() {
    return str -> {
        
        if (LOG.isInfoEnabled()) {
            LOG.info(context.toString());
             }
        };
   }
}

Check Consumer bean replaced the @StreamListener and the @Input.

Regarding the configuration, if before in order to configure you had an application.yml looking like so:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: destination
          group: group
          consumer:
            concurrency: 10
            max-attempts: 3

Now new configuration will be like

spring:
  cloud:
    stream:
      bindings:
        input-in-0:
          destination: destination
          group: group
          consumer:
            concurrency: 10
            max-attempts: 3

Optional: Spring cloud stream lib creates the bean by its own. If the bean is not created. use below properties.

spring:
   cloud:
     stream:
        function:
           definition: input

The in and out corresponds to the type of binding (such as input or output). The index is the index of the input or output binding. It is always 0 for typical single input/output function.

Now let consider Output channels:

    public interface OutputChannels {
        @Output
        MessageChannel output();
    }

    @Service
    @EnableBinding(OutputChannels.class)
    class PubSubSendQueue {
    
    OutputChannels  outputChannel;

    public void publish() {
        outputChannel.output().send("Hello");
    }
}

Now with the Functions code will be as :

@Service
class PubSubSendQueue {
    @Bean
    public Supplier<String> output(){
        return Supplier { "Adam" }
    }
}

Add below in application.properties file

spring.cloud.stream.bindings.output-out-0.destination=destination
Sampler answered 21/4, 2021 at 9:31 Comment(2)
Thanks for the example, this did help me a lot. I had trovel understanding how the method was referenced, at the end I found that input-in-0: the input is in fact the method name. Would also add @Service to the consumer example just to have a working example. Thanks again!Dainedainty
can you please share your source? I need to find the docs for the replacementsTightrope
M
15

This github repository contains a lot of examples..
https://github.com/spring-cloud/stream-applications

The official documentation explains in details how to move from imperative to functional style in spring cloud stream applications with kafka streams but is the same without it.

https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#spring_cloud_function

Please also check this post..
https://spring.io/blog/2019/10/14/spring-cloud-stream-demystified-and-simplified

https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html#_programming_model

There is an example of imperative code (https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html#_imperative_programming_model) and how it should be developed with functional style.

Methylene answered 5/1, 2021 at 11:2 Comment(1)
There is also an example in Microservices with Spring Boot and Spring Cloud: Chapter 2: Introduction to Spring Boot - Code examples for sending and receiving messages & Chapter 7: Developing Reactive Microservices - Developing event-driven asynchronous services. Which shows something like: spring.cloud.function: definition: myPublisher;myProcessor;mySubscriber spring.cloud.stream: bindings.messageProcessor-in-0: destination: products (Just a practical ex, maybe not much explanation in it.)Kalakalaazar
U
4

Here some more helpful information:

Sending a message

Use the org.springframework.cloud.stream.function.StreamBridge for sending messages.

Before

myDataSource.output().send(message);

After

streamBridge.send("myData-out-0", message);

Replacing a ServiceActivator

Before

@ServiceActivator(inputChannel = MyProcessor.INPUT, outputChannel = MyProcessor.OUTPUT)
public Message<MySuperOutputMessage> transform(Message<MySuperInputMessage> message) { ... }

After

@Bean
Function<Message<MySuperInputMessage>, Message<MySuperOutputMessage>> myCoolFunction() {
    return message -> {...}; 
}

Do not forget to register "myCoolFunction" in the properties spring.cloud.function.definition.

Underwood answered 10/12, 2021 at 19:6 Comment(0)
E
2

Until version v2.1.0, SCS (Spring Cloud Stream) worked as annotation-based programing (just as the previous guide).

From this version, SCS preferred way of work is using the function based.

Both are valid and fully functioning implementations.

Both do the same thing and both produce the same result – except that, in the annotation-based, the user has to be aware of SCS abstractions (that is, messaging, channels, binding, and so on) while the actual user code has nothing to do with any of them.

Instead of working with annotation-based configuration, spring now uses detected beans of Consumer/Function/Supplier to define your streams for you.

I wrote blog & guide about Spring Cloud Stream Functional Programming: https://shaikezam.com/article/spring_cloud_stream_functional

Em answered 16/1, 2022 at 8:15 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.