What's the difference between creating a Flux directly by calling Flux.push
and use the sink within push
's lambada expression vs. using a sink provided by a DirectProcessor
?
In a minimal example where a Flux just emits a couple of events, I could do
Flux.<String>push(emitter -> {
emitter.next("One");
emitter.next("Two");
emitter.complete();
});
vs. using a DirectProcessor
var emitter = DirectProcessor.<String>create().sink();
emitter.next("One");
emitter.next("Two");
emitter.complete();
Just to clarify: I know that I could use Flux.just
here, but my use case is actually building a bridge between Spring's @EventListener
s and Spring WebFlux, where I want to create a Flux for every incoming SSE request for a specific resource and then publish events to this Flux.
Could anybody tell me, if both approaches would be valid? Sure, there must be some difference. In particular, the Reactor Reference Guide section on DirectProcessor
states:
On the other hand, it has the limitation of not handling backpressure. As a consequence, a DirectProcessor signals an IllegalStateException to its subscribers if you push N elements through it but at least one of its subscribers has requested less than N.
What does that mean?
[EDIT:] In an earlier version of the question I was using Flux.generate()
instead of Flux.push()
, which is obviously wrong, because generate can create one event at most.
[EDIT 2:] @123 asked me for a full example of what I'm trying to achieve. Bear with me, it's a fair amount of code for an SO question:
Full example of what I'm actually trying to do
I'd like to build a bridge between a (non-reactive) Spring domain event listener and a reactive Flux, which I can then use in a WebFlux endpoint to publish SSEs. The following code snippets use Lombok annotations for brevity.
Let's assume that I eventually want to publish the state of a user in an onboarding process as SSEs. Here's the enum:
public enum ProcessState {
CREATED(false),
VERIFIED(false),
AUTHORIZATION_PENDING(false),
AUTHORIZED(false),
ACTIVE(true);
@Getter
private final boolean terminalState;
ProcessState(boolean terminalState) {
this.terminalState = terminalState;
}
}
The non-reactive business logic will publish StateChangedEvents
whenever the state of any user is changed:
@Data
@RequiredArgsConstructor
public class StateChangedEvent {
private final UUID userId;
private final ProcessState newState;
}
And this is where my original question comes from. How would I build a bridge that translates this domain events into a Flux stream? My requirements:
- The current state of the process should be pushed as soon as a new client registers
- The Flux stream should terminate whenever a "terminal" onboarding state has been reached.
This is what I've got so far:
@Component
@RequiredArgsConstructor
class EventBridge {
@RequiredArgsConstructor(access = PRIVATE)
private static class Subscriber {
private final UUID userId;
private final FluxSink<ProcessState> sink;
private boolean eventEmitted;
}
private final UserRepository repository;
private final Map<UUID, Subscriber> subscribers = new ConcurrentHashMap<>();
@EventListener
void stateChanged(StateChangedEvent event) {
notifySubscribers(event);
}
Flux<ProcessState> register(UUID userId) {
return Flux.push(emitter -> addSubscriber(userId, emitter));
}
private Subscriber addSubscriber(UUID userId, FluxSink<ProcessState> sink) {
var subscriptionId = randomUUID();
var subscriber = new Subscriber(userId, sink);
subscribers.put(subscriptionId, subscriber);
sink
.onRequest(n -> poll(subscriber))
.onDispose(() -> removeSubscriber(subscriptionId));
return subscriber;
}
private void poll(Subscriber subscriber) {
emit(subscriber, loadCurrentState(subscriber), true);
}
private ProcessState loadCurrentState(Subscriber subscriber) {
return repository.findById(subscriber.userId).getProcessState();
}
private void removeSubscriber(UUID subscriptionId) {
subscribers.remove(subscriptionId);
}
private void notifySubscribers(StateChangedEvent event) {
subscribers.values().stream()
.filter(subscriber -> subscriber.userId.equals(event.getUserId()))
.forEach(subscriber -> emit(subscriber, event.getNewState(), false));
}
private void emit(Subscriber subscriber, ProcessState processState, boolean onlyIfFirst) {
synchronized (subscriber) {
if (onlyIfFirst && subscriber.eventEmitted) {
return;
}
subscriber.sink.next(processState);
if (processState.isTerminalState()) {
subscriber.sink.complete();
}
subscriber.eventEmitted = true;
}
}
}
And finally the controller, where the bridge is used:
@RestController
@RequiredArgsConstructor
class UserController {
private final EventBridge eventBridge;
@GetMapping(value = "/{userId}", produces = TEXT_EVENT_STREAM_VALUE)
Flux<ServerSentEvent<ProcessState>> readAsStream(@PathVariable UUID userId) {
return eventBridge.register(userId).map(response -> ServerSentEvent.builder((ProcessState) response).build());
}
}
There are a couple of issues in my bridge code I can't wrap my head around:
Do I really have to synchronize on my
Subscriber
instance to avoid writing stale events frompoll
ing the initial state? If I don't it does happen that a StateChange event arrives and gets published before the current state is read from the repository, which is then pushed out of order. Surely, there must be a more elegant Flux-ish way to handle this without the synchronized keyword.We already ruled out
Flux.generate
, it seems to work withFlux.push
,Flux.create
will generate a whole lot more SSE events? Why? I fear, I don’t understand the differences between the three.Rather then using the static methods on
Flux
should I use aDirectProcessor
or any other processor here? I'm new to the whole reactive stack and the Spring Reactor documentation is rather too vague for me, tbh. Again: What are the differences? What about that comment about back pressure I mentioned above?
Flux.generate
isn't even valid, it should throw an exception. You can only usenext
once per pass. – ConformalFlux.push
instead ofFlux.generate
– UmbilicusFluxSink
, which in theFlux.push
example is just wrapped in aConsumer
. – Conformal