How to cancel an ongoing Spring Flux?
Asked Answered
E

3

13

I'm using a spring flux to send parallel requests to a service, this is very simplified version of it:

Flux.fromIterable(customers)
  .flatMap { customer ->
     client.call(customer)
  } ...

I was wondering how I could cancel this flux, as in, grab a reference to the flux somehow and tell it to shut down.

Explanatory answered 10/8, 2018 at 12:16 Comment(0)
W
19

As you probably know, with reactive objects, all operators are lazy. This means execution of the pipeline is delayed until the moment you subscribe to the reactive stream.

So, in your example, there is nothing to cancel yet because nothing is happening at that point.

But supposing your example was extended to:

Disposable disp = Flux.fromIterable(customers)
  .flatMap { customer ->
     client.call(customer)
  }
  .subscribe();

Then, as you can see, your subscription returns a Disposable object that you can use to cancel the entire thing if you want, e.g.

disp.dispose()

Documentation of dispose says:

Cancel or dispose the underlying task or resource.

There’s another section of the documentation that says the following:

These variants [of operators] return a reference to the subscription that you can use to cancel the subscription when no more data is needed. Upon cancellation, the source should stop producing values and clean up any resources it created. This cancel and clean-up behavior is represented in Reactor by the general-purpose Disposable interface.

Therefore canceling the execution of stream is not free from complications on the reactive object side, because you want to make sure to leave the world in a consistent state if you cancel the stream in the middle of its processing. For example, if you were in the process of building something, you may want to discard resources, destroy any partial aggregation results, close files, channels, release memory or any other resources you have, potentially undoing changes or compensating for them.

You may want to read the documentation on cleanup about this, such that you also consider what you can do on the reactive object side.

Flux<String> bridge = Flux.create(sink -> {
    sink.onRequest(n -> channel.poll(n))
        .onCancel(() -> channel.cancel()) 
        .onDispose(() -> channel.close())  
    });
Weatherboard answered 10/8, 2018 at 12:59 Comment(1)
how can I dispose() the flux right after the subscribe operations are completed?Gustafsson
K
9

Answer from @Edwin is precise. As long as you don't call subscribe, there is nothing to cancel, because no code will be executed.
Just wanted to add an example to make it clear.

public static void main(String[] args) throws InterruptedException {

   List<String> lists = Lists.newArrayList("abc", "def", "ghi");
   Disposable disposable = Flux.fromIterable(lists)
                           .delayElements(Duration.ofSeconds(3))
                           .map(String::toLowerCase)
                           .subscribe(System.out::println);

   Thread.sleep(5000); //Sleeping so that some elements in the flux gets printed
   disposable.dispose();

   Thread.sleep(10000); // Sleeping so that we can prove even waiting for some time nothing gets printed after cancelling the flux
}

But I would say a much cleaner way (functional way) is to make use of functions like takeUntil or take. For instance I can stop the stream in the above example like this as well.

List<String> lists = Lists.newArrayList("abc", "def", "End", "ghi");
Flux.fromIterable(lists).takeUntil(s -> s.equalsIgnoreCase("End"))
                           .delayElements(Duration.ofSeconds(3))
                           .map(String::toLowerCase)
                           .subscribe(System.out::println);

or

List<String> lists = Lists.newArrayList("abc", "def", "ghi");
Flux.fromIterable(lists).take(2)
                           .delayElements(Duration.ofSeconds(2))
                           .map(String::toLowerCase)
                           .subscribe(System.out::println);
Kroo answered 10/8, 2018 at 13:6 Comment(0)
P
0

Another subscribe to my flux then calling a dispose did it for me:

// Setup flux and populate
Flux<String> myFlux = controller.get(json);

// Subscribe
FlowSubscriber<String> sub = new FlowSubscriber<String>();       
myFlux.subscribe(sub);

// Work on elements in the subscription
String myString = sub.consumedElements.get(0);
... do work ...

// Cancel
myFlux.subscribe().dispose();
Pion answered 11/1, 2022 at 18:23 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.