- I'm assuming you are already familiar with the main concepts, and will focus on the migration.
- I'm using kotlin for the demo code, to reduce verbosity
First, some references which may help:
- Here is the initial relevant doc: link
- This is an explanation for the naming scheme in the new functional format: link
- This is a more detailed explanation with some more advanced scenarios: link
TL;DR
Instead of working with annotation-based configuration, spring now uses detected beans of Consumer
/Function
/Supplier
to define your streams for you.
Input/Consumer
Whereas before you had code looking like this:
interface BindableGradesChannel {
@Input
fun gradesChannel(): SubscribableChannel
companion object {
const val INPUT = "gradesChannel"
}
}
and the usage was similar to:
@Service
@EnableBinding(BindableGradesChannel::class)
class GradesListener {
private val log = LoggerFactory.getLogger(GradesListener::class.java)
@StreamListener(BindableScoresChannel.INPUT)
fun listen(grade: Grade) {
log.info("Received $grade")
// do something
}
}
now the entire definition is irrelevant, and can be done like so:
@Service
class GradesListener {
private val log = LoggerFactory.getLogger(GradesListener::class.java)
@Bean
fun gradesChannel(): Consumer<Grade> {
return Consumer { listen(grade = it) }
}
fun listen(grade: Grade) {
log.info("Received $grade")
// do something
}
}
notice how the Consumer
bean replaced the @StreamListener
and the @Input
.
Regarding the configuration, if before in order to configure you had an application.yml
looking like so:
spring:
cloud:
stream:
bindings:
gradesChannel:
destination: GradesExchange
group: grades-updates
consumer:
concurrency: 10
max-attempts: 3
now it should be like so:
spring:
cloud:
stream:
bindings:
gradesChannel-in-0:
destination: GradesExchange
group: grades-updates
consumer:
concurrency: 10
max-attempts: 3
notice how gradesChannel
was replaced by gradesChannel-in-0
- to understand the full naming convention please see the naming convention link at the top.
Some details:
- If you have more than one such bean in your application, you need to define the
spring.cloud.function.definition
property.
- You have the option to give your channels custom names, so if you'd like to continue using
gradesChannel
you can set spring.cloud.stream.function.bindings.gradesChannel-in-0=gradesChannel
and use everywhere in the configuration gradesChannel
.
Output/Supplier
The concept here is similar, you replace config and code looking like this:
interface BindableStudentsChannel {
@Output
fun studentsChannel(): MessageChannel
}
and
@Service
@EnableBinding(BindableStudentsChannel::class)
class StudentsQueueWriter(private val studentsChannel: BindableStudentsChannel) {
fun publish(message: Message<Student>) {
studentsChannel.studentsChannel().send(message)
}
}
can now be replaced by:
@Service
class StudentsQueueWriter {
@Bean
fun studentsChannel(): Supplier<Student> {
return Supplier { Student("Adam") }
}
}
As you can see, we have a major difference - when is it called and by who?
Before we could trigger it manually, but now it is triggered by spring, every second (by default). This is fine for use cases such as when you need to publish a sensor data every second, but this is not good when you want to send the message on an event. Besides using Function
for whatever reason, spring offers 2 alternatives:
StreamBridge - link
Using StreamBridge
you can. define the target explicitly like so:
@Service
class StudentsQueueWriter(private val streamBridge: StreamBridge) {
fun publish(message: Message<Student>) {
streamBridge.send("studentsChannel-out-0", message)
}
}
This way you don't define the target channel as a bean, but you can still send the message. The downside is that you have some explicit configuration in your class.
Reactor API - link
The other way is to use some kind of reactive mechanism such as Sinks.Many
, and to return it. Using this your code will look similar to:
@Service
class StudentsQueueWriter {
val students: Sinks.Many<String> = Sinks.many().multicast().onBackpressureBuffer()
@Bean
fun studentsChannel(): Supplier<Flux<Student>> {
return Supplier { students.asFlux() }
}
}
and the usage may be similar to:
class MyClass(val studentsQueueWriter: StudentsQueueWriter) {
fun newStudent() {
studentsQueueWriter.students.tryEmitNext(Student("Adam"))
}
}