EnableBinding, Output, Input deprecated Since version of 3.1 of Spring Cloud Stream
Asked Answered
N

1

9

Since version 3.1 the major API for working with queues is deprecated. In the class comment it says:

Deprecated as of 3.1 in favor of functional programming model

I searched a lot in the web for a solution but didn't find a solid E2E explanation on how I should migrate.

Looking for examples for:

  1. read from queue
  2. write to queue

If there are a few ways to do that (as I saw in web) I'd be glad for an explanation and the typical use case for each option as well.

Nicolina answered 31/1, 2021 at 10:30 Comment(1)
Does this answer your question? @EnableBinding @deprecated as of 3.1 in favor of functional programming modelCaribou
P
33
  1. I'm assuming you are already familiar with the main concepts, and will focus on the migration.
  2. I'm using kotlin for the demo code, to reduce verbosity

First, some references which may help:

  • Here is the initial relevant doc: link
  • This is an explanation for the naming scheme in the new functional format: link
  • This is a more detailed explanation with some more advanced scenarios: link

TL;DR

Instead of working with annotation-based configuration, spring now uses detected beans of Consumer/Function/Supplier to define your streams for you.

Input/Consumer

Whereas before you had code looking like this:

interface BindableGradesChannel {
    @Input
    fun gradesChannel(): SubscribableChannel

    companion object {
        const val INPUT = "gradesChannel"
    }
}

and the usage was similar to:

@Service
@EnableBinding(BindableGradesChannel::class)
class GradesListener {
    private val log = LoggerFactory.getLogger(GradesListener::class.java)
    
    @StreamListener(BindableScoresChannel.INPUT)
    fun listen(grade: Grade) {
        log.info("Received $grade")
        // do something
    }
}

now the entire definition is irrelevant, and can be done like so:

@Service
class GradesListener {
    private val log = LoggerFactory.getLogger(GradesListener::class.java)

    @Bean
    fun gradesChannel(): Consumer<Grade> {
        return Consumer { listen(grade = it) }
    }
    
    fun listen(grade: Grade) {
        log.info("Received $grade")
        // do something
    }
}

notice how the Consumer bean replaced the @StreamListener and the @Input.

Regarding the configuration, if before in order to configure you had an application.yml looking like so:

spring:
  cloud:
    stream:
      bindings:
        gradesChannel:
          destination: GradesExchange
          group: grades-updates
          consumer:
            concurrency: 10
            max-attempts: 3

now it should be like so:

spring:
  cloud:
    stream:
      bindings:
        gradesChannel-in-0:
          destination: GradesExchange
          group: grades-updates
          consumer:
            concurrency: 10
            max-attempts: 3

notice how gradesChannel was replaced by gradesChannel-in-0 - to understand the full naming convention please see the naming convention link at the top.

Some details:

  1. If you have more than one such bean in your application, you need to define the spring.cloud.function.definition property.
  2. You have the option to give your channels custom names, so if you'd like to continue using gradesChannel you can set spring.cloud.stream.function.bindings.gradesChannel-in-0=gradesChannel and use everywhere in the configuration gradesChannel.

Output/Supplier

The concept here is similar, you replace config and code looking like this:

interface BindableStudentsChannel {
    @Output
    fun studentsChannel(): MessageChannel
}

and

@Service
@EnableBinding(BindableStudentsChannel::class)
class StudentsQueueWriter(private val studentsChannel: BindableStudentsChannel) {
    fun publish(message: Message<Student>) {
        studentsChannel.studentsChannel().send(message)
    }
}

can now be replaced by:

@Service
class StudentsQueueWriter {
    @Bean
    fun studentsChannel(): Supplier<Student> {
        return Supplier { Student("Adam") }
    }
}

As you can see, we have a major difference - when is it called and by who?

Before we could trigger it manually, but now it is triggered by spring, every second (by default). This is fine for use cases such as when you need to publish a sensor data every second, but this is not good when you want to send the message on an event. Besides using Function for whatever reason, spring offers 2 alternatives:

StreamBridge - link

Using StreamBridge you can. define the target explicitly like so:

@Service
class StudentsQueueWriter(private val streamBridge: StreamBridge) {
    fun publish(message: Message<Student>) {
        streamBridge.send("studentsChannel-out-0", message)
    }
}

This way you don't define the target channel as a bean, but you can still send the message. The downside is that you have some explicit configuration in your class.

Reactor API - link

The other way is to use some kind of reactive mechanism such as Sinks.Many, and to return it. Using this your code will look similar to:

@Service
class StudentsQueueWriter {
    val students: Sinks.Many<String> = Sinks.many().multicast().onBackpressureBuffer()
    @Bean
    fun studentsChannel(): Supplier<Flux<Student>> {
        return Supplier { students.asFlux() }
    }
}

and the usage may be similar to:

class MyClass(val studentsQueueWriter: StudentsQueueWriter) {
    fun newStudent() {
        studentsQueueWriter.students.tryEmitNext(Student("Adam"))
    }
}
Paregoric answered 31/1, 2021 at 12:24 Comment(14)
Also, please see the relevant section in the doc as well as these two blog posts - spring.io/blog/2019/10/14/… and spring.io/blog/2019/10/17/…Furtherance
nice refernces, thanks! may I add them to the reference section at the top?Paregoric
If you think that helps, sureFurtherance
Beautiful answer. Thanks!Provincialism
Beautiful answer, with detailed example :)Oreilly
@Paregoric Unfortunately, EmitterProcessor is also deprecated now and will be removed soon. Do you have an example for another alternative? I want to avoid the explicit configuration in code like in the StreamBridge example.Yaroslavl
@Yaroslavl I think you can try using val students: Sinks.Many<String> = Sinks.many().multicast().onBackpressureBuffer() instead, and in the bean have Supplier { students.asFlux() } and the usage should be: students.tryEmitNext(Student("Hello"))Paregoric
If this works for you, we can edit the question to reflect thisParegoric
@Paregoric Hard to tell if it works - I implemented it as you suggested in my code, and the result of tryEmitNext is "OK", but no message shows up in rabbitmq. But I'm also not sure if the bindings are configured correctly. I think I will post a separate question regarding this.Yaroslavl
Does it work if you use the deprecated EmitterProcessor? If so, the problem is with the Sink, otherwise it is probably a configuration errorParegoric
Yeah apparently it's some config error, it also does not work with EmitterProcessor. Although I can see that the beans are created, no exchange or queue shows up and the messages just disappear. I will try to check the config.Yaroslavl
can you mix the old producer (not refactored - who sends the output) and have the new functional consumer(who receives the input)?Aleksandrovsk
I think you can, but untested. Also take note - the old approach is deprecated, so in a few versions it will probably be removed.Paregoric
btw, I checked removing the EmitterProcessor and it works - I'll update the answer to use the non-deprecated optionParegoric

© 2022 - 2025 — McMap. All rights reserved.