How to have multiple subscribers to Flux that run on different execution contexts / threads
Asked Answered
F

2

6

I'm developing a Spring Boot WebFlux application for IoT real time data visualization.

I have a Flux which simulates data coming from a device, and I want that upon websocket connection established every single event:

  • must be sent over the websocket for real time visualization (using reactive WebSocketHandler)
  • must be checked against a given condition so to send a notification via HTTP REST call (RestTemplate)

From my logs it seems the two subscribers (the websocket handler and the notifier) gets two different flows with completely different values (below the logs).

I also tried a variant chaining the share method after the map in the MySource class, in this case it looks like though I have just one Flux, there is only ONE thread so everything is blocking (I can see the REST call blocks the sending over the websocket).

What happens here? How can I make the two subscribers to be run in different execution contexts (different threads), so completely independent from each other?

Below relevant code snippets and logs.

Thank you all in advance!

UPDATE: for the sake of clarity I have to specify that MyEvents have RANDOM-generated values, so I solved one issue thanks to @NikolaB's answer by using the ConnectableFlux / share which guarantees to have the same Flux, but I still want to have separate execution contexts for the two subscribers.

public class MyWebSocketHandler implements WebSocketHandler {

   @Autowired
   public MySource mySource;

   @Autowired
   public Notifier notifier;

   public Mono<Void> handle(WebSocketSession webSocketSession) {
            Flux<MyEvent> events = mySource.events();
            events.subscribe(event -> notifier.sendNotification(event));
            return webSocketSession.send(events.map(this::toJson).map(webSocketSession::textMessage));
   }

   private String toJson(MyEvent event) {
       log.info("websocket toJson " + event.getValue());
       ...
   }
}
public class MySource {
   public Flux<MyEvent> events() {
      return Flux.interval(...).map(i -> new MyEvent(*Random Generate Value*);
   }
}
public class Notifier {

   public void sendNotification (MyEvent event) {
      log.info("notifier sendNotification " + event.getValue());
      if (condition met)
         restTemplate.exchange(...)
   }
}
2019-11-19 11:58:55.375 INFO [     parallel-3] i.a.m.websocket.MyWebSocketHandler  : websocket toJson 4.09
2019-11-19 11:58:55.375 INFO [     parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.86
2019-11-19 11:58:57.366 INFO [     parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.24
2019-11-19 11:58:57.374 INFO [     parallel-3] i.a.m.websocket.MyWebSocketHandler  : websocket toJson 4.11
2019-11-19 11:58:59.365 INFO [     parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.61
2019-11-19 11:58:59.374 INFO [     parallel-3] i.a.m.websocket.MyWebSocketHandler  : websocket toJson 4.03
2019-11-19 11:59:01.365 INFO [     parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.88
2019-11-19 11:59:01.375 INFO [     parallel-3] i.a.m.websocket.MyWebSocketHandler  : websocket toJson 4.29
2019-11-19 11:59:03.364 INFO [     parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.37
Flamen answered 19/11, 2019 at 11:13 Comment(0)
L
7

There are couple of issues here, first RestTemplate is synchronous/blocking HTTP client so you should use WebClient which is reactive, also to create ConnectableFlux (Flux which can have multiple subscribers) you need to share it before map operator and create new Flux-es which are created from connected one.

Example:

Flux<MyEvent> connectedFlux = mySource.events().share();
Flux.from(connectedFlux).subscribe(event -> notifier.sendNotification(event));
return webSocketSession.send(Flux.from(connectedFlux).map(this::toJson).map(webSocketSession::textMessage));

Also sendNotification method should return Mono<Void> since reactive methods should always return Mono or Fluxtypes.

To initiate independent executions you could Zip those two Monos.

Edit

First of all as mentioned above use WebClient for outgoing HTTP calls which is reactive HTTP client and rework the Notifier class:

public class Notifier {

   public Mono<Void> sendNotification (MyEvent event) {
      log.info("notifier sendNotification " + event.getValue());
      return Mono.just(event)
                 .filter(e -> /* your condition */)
                 .flatMap(e -> WebClient.builder().baseUrl("XXX")...)
                 .then();
   }

}

Now take a look if execution context is different.

Lagasse answered 19/11, 2019 at 12:8 Comment(6)
I managed to use ConnectableFlux / share so now the single Flux source is guaranteed, but the problem is that sendNotification is called by the same thread that executes the websocket stuff. What I want to achieve is a separation of execution contexts for subscribers. Is it possible? I feel I have to add subscribeOn somewhere but I'm not sure...Flamen
I've edited the body of my question to better clarify this.Flamen
@Flamen Yes it's possible, but not with subscribeOn or publishOn because WebClient and reactive WebSockets each have their own non-blocking threads and your subscribeOn or publishOn would be overridden by their internal execution definition. Added more code to my answer.Lagasse
I tried but either I don't get any response from the WebClient or I end up to put a block after exchange or retrieve but it doesn't work 'cause I have the following error: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-2. Should I explicitly put a subscription on the WebClient to fire the request (e.g. using block)? Or maybe the Mono returned by exchange has to be returned back by the sendNotification method, but then what to do there? Is there something that has to do with zip method you mentioned?Flamen
Absolutely you have to call subscribe to initiate reactive execution so notifier.sendNotification(event).subscribe() will initiate execution. You don't have to call subscribe on reactive method/operator chains returned in Controllers because Spring implicitly calls it, but for all other executions you need to explicitly call it. The zipWith operator could be used for independent reactive executions when you need results of multiple independent reactive method executions downstream for further manipulations.Lagasse
Ok now it works! with the subscribe I'm able to fire the http request and i finally see the non-blocking behavior that I wanted. However, I noticed a slightly different behavior on execution contexts, as opposed to what you said, for example reactive websockets inherit and are run on the thread pool set upstream by the Flux.interval I used in MySource class (which is parallel scheduler, see here). Thank youFlamen
H
0

Note also that you are using "cold" flux to simulate events (Flux.interval(...)). In reality, there will be some Sink I think. Try to play with this code:

@Controller
@Slf4j
public class ReactiveController {
    record SomeDTO(String name, String address) {
    }

    private final Sinks.Many<SomeDTO> eventSink = Sinks.many().replay().latest();

    @RequestMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<SomeDTO>> sse() {
        final AtomicLong counter = new AtomicLong(0);
        return eventSink.asFlux()
                .map(e -> ServerSentEvent.builder(e)
                        .id(counter.incrementAndGet() + "")
                        //.event(e.getClass().getName())
                        .build());
    }

    // note, when you want this to work in production, ensure, that http request is not being cached on its way, using POST method for example.
    @ResponseStatus(HttpStatus.OK)
    @ResponseBody
    @GetMapping(path = "/sendSomething", produces = MediaType.TEXT_PLAIN_VALUE)
    public String sendSomething() {
        this.eventSink.emitNext(
                new SomeDTO("name", "address"),
                (signalType, emitResult) -> {
                    log.warn("Some event is being not send to all subscribers. It will vanish...");
                    // returning false, to not retry emitting given data again.
                    return false;
                }
        );
        return "Have a look at /sse endpoint (using \"curl http://localhost/sse\" for example), to see events in realtime.";
    }
}
Hostetter answered 13/10, 2022 at 12:34 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.