How to bridge between a non-reactive Spring EventListener and a reactive Flux
Asked Answered
U

2

7

What's the difference between creating a Flux directly by calling Flux.push and use the sink within push's lambada expression vs. using a sink provided by a DirectProcessor?

In a minimal example where a Flux just emits a couple of events, I could do

Flux.<String>push(emitter -> {
   emitter.next("One");
   emitter.next("Two");
   emitter.complete();
 });

vs. using a DirectProcessor

var emitter = DirectProcessor.<String>create().sink();
emitter.next("One");
emitter.next("Two");
emitter.complete();

Just to clarify: I know that I could use Flux.just here, but my use case is actually building a bridge between Spring's @EventListeners and Spring WebFlux, where I want to create a Flux for every incoming SSE request for a specific resource and then publish events to this Flux.

Could anybody tell me, if both approaches would be valid? Sure, there must be some difference. In particular, the Reactor Reference Guide section on DirectProcessor states:

On the other hand, it has the limitation of not handling backpressure. As a consequence, a DirectProcessor signals an IllegalStateException to its subscribers if you push N elements through it but at least one of its subscribers has requested less than N.

What does that mean?

[EDIT:] In an earlier version of the question I was using Flux.generate() instead of Flux.push(), which is obviously wrong, because generate can create one event at most.

[EDIT 2:] @123 asked me for a full example of what I'm trying to achieve. Bear with me, it's a fair amount of code for an SO question:

Full example of what I'm actually trying to do

I'd like to build a bridge between a (non-reactive) Spring domain event listener and a reactive Flux, which I can then use in a WebFlux endpoint to publish SSEs. The following code snippets use Lombok annotations for brevity.

Let's assume that I eventually want to publish the state of a user in an onboarding process as SSEs. Here's the enum:

public enum ProcessState {
  CREATED(false),
  VERIFIED(false),
  AUTHORIZATION_PENDING(false),
  AUTHORIZED(false),
  ACTIVE(true);

  @Getter
  private final boolean terminalState;

  ProcessState(boolean terminalState) {
    this.terminalState = terminalState;
  }

}

The non-reactive business logic will publish StateChangedEvents whenever the state of any user is changed:

@Data
@RequiredArgsConstructor
public class StateChangedEvent {
  private final UUID userId;
  private final ProcessState newState;
}

And this is where my original question comes from. How would I build a bridge that translates this domain events into a Flux stream? My requirements:

  • The current state of the process should be pushed as soon as a new client registers
  • The Flux stream should terminate whenever a "terminal" onboarding state has been reached.

This is what I've got so far:

@Component
@RequiredArgsConstructor
class EventBridge {

  @RequiredArgsConstructor(access = PRIVATE)
  private static class Subscriber {
    private final UUID userId;
    private final FluxSink<ProcessState> sink;
    private boolean eventEmitted;
  }

  private final UserRepository repository;
  private final Map<UUID, Subscriber> subscribers = new ConcurrentHashMap<>();

  @EventListener
  void stateChanged(StateChangedEvent event) {
    notifySubscribers(event);
  }

  Flux<ProcessState> register(UUID userId) {
    return Flux.push(emitter -> addSubscriber(userId, emitter));
  }

  private Subscriber addSubscriber(UUID userId, FluxSink<ProcessState> sink) {
    var subscriptionId = randomUUID();
    var subscriber = new Subscriber(userId, sink);
    subscribers.put(subscriptionId, subscriber);
    sink
      .onRequest(n -> poll(subscriber))
      .onDispose(() -> removeSubscriber(subscriptionId));
    return subscriber;
  }

  private void poll(Subscriber subscriber) {
    emit(subscriber, loadCurrentState(subscriber), true);
  }

  private ProcessState loadCurrentState(Subscriber subscriber) {
    return repository.findById(subscriber.userId).getProcessState();
  }

  private void removeSubscriber(UUID subscriptionId) {
    subscribers.remove(subscriptionId);
  }

  private void notifySubscribers(StateChangedEvent event) {
    subscribers.values().stream()
      .filter(subscriber -> subscriber.userId.equals(event.getUserId()))
      .forEach(subscriber -> emit(subscriber, event.getNewState(), false));
  }

  private void emit(Subscriber subscriber, ProcessState processState, boolean onlyIfFirst) {
    synchronized (subscriber) {
      if (onlyIfFirst && subscriber.eventEmitted) {
        return;
      }
      subscriber.sink.next(processState);
      if (processState.isTerminalState()) {
        subscriber.sink.complete();
      }
      subscriber.eventEmitted = true;
    }
  }

}

And finally the controller, where the bridge is used:

@RestController
@RequiredArgsConstructor
class UserController {

  private final EventBridge eventBridge;

  @GetMapping(value = "/{userId}", produces = TEXT_EVENT_STREAM_VALUE)
  Flux<ServerSentEvent<ProcessState>> readAsStream(@PathVariable UUID userId) {
    return eventBridge.register(userId).map(response -> ServerSentEvent.builder((ProcessState) response).build());
  }

}

There are a couple of issues in my bridge code I can't wrap my head around:

  • Do I really have to synchronize on my Subscriber instance to avoid writing stale events from polling the initial state? If I don't it does happen that a StateChange event arrives and gets published before the current state is read from the repository, which is then pushed out of order. Surely, there must be a more elegant Flux-ish way to handle this without the synchronized keyword.

  • We already ruled out Flux.generate, it seems to work with Flux.push, Flux.create will generate a whole lot more SSE events? Why? I fear, I don’t understand the differences between the three.

  • Rather then using the static methods on Flux should I use a DirectProcessor or any other processor here? I'm new to the whole reactive stack and the Spring Reactor documentation is rather too vague for me, tbh. Again: What are the differences? What about that comment about back pressure I mentioned above?

Umbilicus answered 1/4, 2020 at 6:47 Comment(6)
That Flux.generate isn't even valid, it should throw an exception. You can only use next once per pass.Conformal
Thanks for pointing that out, I rephrased my question using Flux.push instead of Flux.generateUmbilicus
Can you provide an example of what you are trying to do with them? As the second example just makes a FluxSink, which in the Flux.push example is just wrapped in a Consumer.Conformal
Added full example code of what I've so farUmbilicus
Cheers, don't have time today but I'll have a look at it tomorrowConformal
I've posted how I would approach below. Wrt your subquestions I think they may warrant new questions as likely would need to post code examples to explain properly.Conformal
C
9

So if I understand what you are trying to do correctly I think your solution could be heavily simplified.

@Component
public class EventBridge {

    private final UserRepository repository;
    private final ReplayProcessor<StateChangedEvent> processor;
    private final FluxSink<StateChangedEvent> sink;


    EventBridge(UserRepository repository){
        this.repository= repository;
        //Replays events from last 100S for every new subscriber
        this.processor = ReplayProcessor.createTimeout(Duration.ofSeconds(100L));
        //Sink provides thread safe next,complete and error for subscribers
        this.sink = processor.sink();
    }

    public void changeState(StateChangedEvent event) {
        //Literally just pass event into sink, calls onNext on subscribers
        sink.next(event);
    }

    public Flux<ProcessState> streamProcessStateForUser(UUID uuid){
        return
                //Search repository first, this isn't great since it blocks until 
                //the repo returns, although that seems to be what you want
                //Also I added an Unknown to ProcessState, since it's better than 
                //it being null. 
                //Also you should probably return optional from repo. 
            Flux.concat(
                    Flux.just(
                            userRepo.findById(uuid).map(User::getProcessState).orElse(ProcessState.UNKNOWN)
                    ),
                    processor
                            //Check the uuid matches the event
                            .filter(stateChangedEvent -> stateChangedEvent.getUserId().equals(uuid))
                            //Time out after 100 seconds, not needed but may be useful for you
                            .take(Duration.ofSeconds(100L))
                            //Complete flux when at terminal state
                            .takeUntil(stateChangedEvent -> stateChangedEvent.getNewState().isTerminalState())
                            //Convert to ProcessState from StateChangedEvent
                            .map(StateChangedEvent::getNewState)
            );
    }

}

Should be able to keep everything else the same.

Conformal answered 8/4, 2020 at 9:22 Comment(9)
I'll try that in a minute, but to clarify: Why do I need a ReplayProcessor with a timeout of 100 seconds? If I don't care about past events, what would I use?Umbilicus
and btw: blocking is fine. I'm dealing with user events that happen at a rate of minutes, not milliseconds.Umbilicus
and one more thing: can't post the "real" use case here, but yes, we do have an UNKNOWN state and yes, our repos return Optionals, but thx for pointing that out :)Umbilicus
Almost happy, but the first state is now published twice. I assume once reading from the repo and second time from the ReplayProcessorUmbilicus
@StefanHaberl I suggested the replay processor since if not used you will drop any states that come through whilst searching the repo. I would probably use DirectProcessor in your case. Should be able to just switch out parts to better suit your use case. You can switch it out in situ. i.e this.processor = DirectProcessor.create()Conformal
Oh also yeah you'd have to update the instance var back to DirectProcessor like you've done in the edit!Conformal
DirectProcessor doesn't cut it for me, because I might again loose the "terminal" event causing the Flux stream to block until it times out after 100 seconds. So ReplayProcessor it would be. Two more things: (1) The duplicated publication I mentioned in a comment above is easy to fix: just add distinct() to the concatenated Flux and (2) the "blocking" nature of the first Flux.just is also easy to fix by replacing the outer Flux.concat with a Flux.mergeUmbilicus
Thanks again @Conformal for pointing me in the right direction. After some thoughts I switched to a completely different, simpler solution. I'll update the question title and add a separate answer for future referenceUmbilicus
@StefanHaberl Wrt (2), it will (should) still block even if you use merge, since the repo call is not reactive. Merge just subscribes eagerly to each source so the results should be interleaved as they arrive. Since just is used though then it will effectively work the same as concat with all elements from just coming first Wrt (1), I'd completely forgotten that existed, nice find!Conformal
U
5

Thanks @123 for answering my question about how to build a bridge between Spring's @EventListener and a Flux. As mentioned in the question, the complete use case was to push the domain events to a client using WebFlux's SSE support.

After a little bit of thinking, I realized that it doesn't make sense to build this bridge in the first place, because in a multi-instance scenario the HTTP request might hit a different instance than the one where the onboarding process is running, and therefore no events will be pushed at all.

So in the end, I opted to poll the single source of truth - the database - and push SSE events, whenever the state changes. Would be great, if we could use a reactive data store here, but for now I'm "stuck" with Spring Data JPA and a PostgreSQL.

So, if somebody has the same problem, this is how I built it in the end:

@RestController
@RequiredArgsConstructor
class UserController {

  private final UserRepository userRepository;

  @GetMapping(value = "/{userId}", produces = TEXT_EVENT_STREAM_VALUE)
  Flux<ServerSentEvent<ProcessState>> readAsStream(@PathVariable UUID userId) {
    return Flux.interval(ZERO, ofSeconds(1L))
      .map(n -> userRepository.findById(userId).getProcessState())
      .takeUntil(processState -> processState.isTerminalState())
      .distinctUntilChanged()
      .map(response -> ServerSentEvent.builder((ProcessState) response).build())
      .take(ofMinutes(30L));
  }

}

Just in case somebody is wondering: This is again simplified to illustrate the problem at hand. We have a hexagonal architecture, i.e., don't inject Repositories in our @RestControllers, but call a business facade, aka input port, from our web layer to retrieve users.

Umbilicus answered 9/4, 2020 at 7:12 Comment(1)
No worries! Out of curiosity how do you update the repository? Whatever does that could relay the event to a topic listened to by all relevant instances. Then you wouldn't have to repeatedly poll the repo for 30 minutesConformal

© 2022 - 2024 — McMap. All rights reserved.