Spring 5 Reactive WebSockets: recommended use
Asked Answered
C

2

12

I've been learning a bit about Spring 5 WebFlux, reactive programming and websockets. I've watched Josh Long's Spring Tips: Reactive WebSockets with Spring Framework 5. The code that sends data from server to client through a WebSocket connection uses a Spring Integration IntegrationFlow that publishes to a PublishSubcribeChannel which has a custom MessageHandler subscribed to it that takes the message, converts it to an object that is then converted to Json and emitted to the FluxSink from the callback supplied to Flux.create(), which is used to send to the WebSocketConnection.

I was wondering if the use of IntegrationFlow and PublishSubscribeChannel is the recommended way to push events from a background process to the client, or if this is just more convenient in this particular example (monitoring the file system). I'd think if you have control over the background process, you could have it emit to the FluxSink directly?

I'm thinking about use cases similar to the following:

  • a machine learning process whose progress is monitored
  • updates to the state of a game world that are sent to players
  • chat rooms / team collaboration software
  • ...
Colewort answered 5/10, 2017 at 20:28 Comment(3)
Quite interested in the answer to this as well. Reactor's Flux isn't a message bus, so maybe that's why a Channel is more appropriate.Embellishment
Quite interested in an answer as wellLynxeyed
I don't see any reason not to publish background events directly to a FluxSink. It seems quite a bit simpler to do as far as implementation.Piecedyed
P
3

What I've done in the past that has worked for me is to create a Spring Component that implements WebSocketHandler:

@Component
public class ReactiveWebSocketHandler implements WebSocketHandler {

Then in the handle method, Spring injects the WebSocketSession object

@Override
public Mono<Void> handle(WebSocketSession session) {

Then create one or more Flux reactive publishers that emit messages(WebSocketMessage) for the client.

    final var output = session.send(Flux.merge(flux1, flux2));

Then you can zip up the incoming and outgoing Flux objects in a Mono and then Spring will take it from there.

        return Mono.zip(incomingWebsocketMsgResponse.getWebSocketMsgFlux().then(),
                outputWithErrorMsgs)
                .then();

Example: https://howtodoinjava.com/spring-webflux/reactive-websockets/

Pitsaw answered 1/12, 2020 at 18:53 Comment(0)
W
1

Since this question, Spring introduced RSocket support - you might think about it like the WebSocket STOMP support existing in Spring MVC, but much more powerful and efficient, supporting backpressure and advanced communication patterns at the protocol level.

For the use cases you're mentioning, I'd advise using RSocket as you'd get a powerful programming model with @MessageMapping and all the expected support in Spring (codecs for JSON and CBOR, security, etc).

Wireman answered 2/11, 2019 at 9:56 Comment(3)
RSocket is great since it brings networking in as a first-class citizen into the reactive paradigm. However, RSocket and Websockets are not mutually exclusive. RSocket is a protocol that runs on TCP or Websockets. The WebSocket protocol runs on layer 7 of the OSI model like HTTP/2 while RSocket runs on OSI Layer 5/6.Pitsaw
unfortunately the RSocket websocket client for javascript is currently only at 0.0.19 and already 10 months old. see npmjs.com/package/rsocket-websocket-clientColewort
Rsocket itself is only at 0.2Pitsaw

© 2022 - 2024 — McMap. All rights reserved.