I'm developing a Spring Boot WebFlux application for IoT real time data visualization.
I have a Flux
which simulates data coming from a device, and I want that upon websocket connection established every single event:
- must be sent over the websocket for real time visualization (using reactive
WebSocketHandler
) - must be checked against a given condition so to send a notification via HTTP REST call (
RestTemplate
)
From my logs it seems the two subscribers (the websocket handler and the notifier) gets two different flows with completely different values (below the logs).
I also tried a variant chaining the share
method after the map
in the MySource
class, in this case it looks like though I have just one Flux, there is only ONE thread so everything is blocking (I can see the REST call blocks the sending over the websocket).
What happens here? How can I make the two subscribers to be run in different execution contexts (different threads), so completely independent from each other?
Below relevant code snippets and logs.
Thank you all in advance!
UPDATE: for the sake of clarity I have to specify that MyEvent
s have RANDOM-generated values, so I solved one issue thanks to @NikolaB's answer by using the ConnectableFlux
/ share
which guarantees to have the same Flux
, but I still want to have separate execution contexts for the two subscribers.
public class MyWebSocketHandler implements WebSocketHandler {
@Autowired
public MySource mySource;
@Autowired
public Notifier notifier;
public Mono<Void> handle(WebSocketSession webSocketSession) {
Flux<MyEvent> events = mySource.events();
events.subscribe(event -> notifier.sendNotification(event));
return webSocketSession.send(events.map(this::toJson).map(webSocketSession::textMessage));
}
private String toJson(MyEvent event) {
log.info("websocket toJson " + event.getValue());
...
}
}
public class MySource {
public Flux<MyEvent> events() {
return Flux.interval(...).map(i -> new MyEvent(*Random Generate Value*);
}
}
public class Notifier {
public void sendNotification (MyEvent event) {
log.info("notifier sendNotification " + event.getValue());
if (condition met)
restTemplate.exchange(...)
}
}
2019-11-19 11:58:55.375 INFO [ parallel-3] i.a.m.websocket.MyWebSocketHandler : websocket toJson 4.09
2019-11-19 11:58:55.375 INFO [ parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.86
2019-11-19 11:58:57.366 INFO [ parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.24
2019-11-19 11:58:57.374 INFO [ parallel-3] i.a.m.websocket.MyWebSocketHandler : websocket toJson 4.11
2019-11-19 11:58:59.365 INFO [ parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.61
2019-11-19 11:58:59.374 INFO [ parallel-3] i.a.m.websocket.MyWebSocketHandler : websocket toJson 4.03
2019-11-19 11:59:01.365 INFO [ parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.88
2019-11-19 11:59:01.375 INFO [ parallel-3] i.a.m.websocket.MyWebSocketHandler : websocket toJson 4.29
2019-11-19 11:59:03.364 INFO [ parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.37
ConnectableFlux
/share
so now the single Flux source is guaranteed, but the problem is thatsendNotification
is called by the same thread that executes the websocket stuff. What I want to achieve is a separation of execution contexts for subscribers. Is it possible? I feel I have to addsubscribeOn
somewhere but I'm not sure... – Flamen