block()/blockFirst()/blockLast() are blocking error when calling bodyToMono AFTER exchange()
Asked Answered
M

6

53

I am trying to use Webflux to stream a generated file to another location, however, if the generation of the file ran into an error, the api returns success, but with a DTO detailing the errors while generating the file instead of the file itself. This is using a very old and poorly designed api so please excuse the use of post and the api design.

The response from the api call (exchange()) is a ClientResponse. From here I can either convert to a ByteArrayResource using bodyToMono which can be streamed to a file, or, if there is an error in creating the file, then I can convert to the DTO also using bodyToMono. However, I cannot seem to do either or depending on the contents of the header of ClientResponse.

In run time I get an IllegalStateException caused by

block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-client-epoll-12

I think my issue is that I cannot call block() twice in the same function chain.

My code snippet is like so:

webClient.post()
        .uri(uriBuilder -> uriBuilder.path("/file/")
                                      .queryParams(params).build())
        .exchange()
        .doOnSuccess(cr -> {
                if (MediaType.APPLICATION_JSON_UTF8.equals(cr.headers().contentType().get())) {
                    NoPayloadResponseDto dto = cr.bodyToMono(NoPayloadResponseDto.class).block();
                    createErrorFile(dto);
                }
                else {
                    ByteArrayResource bAr = cr.bodyToMono(ByteArrayResource.class).block();
                    createSpreadsheet(bAr);
                }
            }
        )
        .block();

Basically I want to process the ClientResponse differently based on the MediaType which is defined in the header.

Is this possible?

Melanymelaphyre answered 20/7, 2018 at 20:26 Comment(2)
Don't block, subscribe. There should be no reason to call block. If you are using WebFlux you are doing so because you want to build a reactive pipeline, if you are calling block you are not doing that. Put another way, if you are calling block, just use a plain old RestTemplate - you code looks very procedural and side-effecty anyway, so shoehorning it into Reactor won't make it magically reactive.Karmen
To clarify we are trying to stream a file to disk from a web API using WebClient. The response could be either 200 OK application/json when there are errors, or 200 OK Content-Disposition when no errors.How would we do that using WebClient without loading the file entirely in to memory?Chaetognath
C
60

First, a few things that will help you understand the code snippet solving this use case.

  1. You should never call a blocking method within a method that returns a reactive type; you will block one of the few threads of your application and it is very bad for the application
  2. Anyway as of Reactor 3.2, blocking within a reactive pipeline throws an error
  3. Calling subscribe, as suggested in the comments, is not a good idea either. It is more or less like starting that job as a task in a separate thread. You'll get a callback when it's done (the subscribe methods can be given lambdas), but you're in fact decoupling your current pipeline with that task. In this case, the client HTTP response could be closed and resources cleaned before you get a chance to read the full response body to write it to a file
  4. If you don't want to buffer the whole response in memory, Spring provides DataBuffer (think ByteBuffer instances that can be pooled).
  5. You can call block if the method you're implementing is itself blocking (returning void for example), for example in a test case.

Here's a code snippet that you could use to do this:

Mono<Void> fileWritten = WebClient.create().post()
        .uri(uriBuilder -> uriBuilder.path("/file/").build())
        .exchange()
        .flatMap(response -> {
            if (MediaType.APPLICATION_JSON_UTF8.equals(response.headers().contentType().get())) {
                Mono<NoPayloadResponseDto> dto = response.bodyToMono(NoPayloadResponseDto.class);
                return createErrorFile(dto);
            }
            else {
                Flux<DataBuffer> body = response.bodyToFlux(DataBuffer.class);
                return createSpreadsheet(body);
            }
        });
// Once you get that Mono, you should give plug it into an existing
// reactive pipeline, or call block on it, depending on the situation

As you can see, we're not blocking anywhere and methods dealing with I/O are returning Mono<Void>, which is the reactive equivalent of a done(error) callback that signals when things are done and if an error happened.

Since I'm not sure what the createErrorFile method should do, I've provided a sample for createSpreadsheet that just writes the body bytes to a file. Note that since databuffers might be recycled/pooled, we need to release them once we're done.

private Mono<Void> createSpreadsheet(Flux<DataBuffer> body) {
    try {
        Path file = //...
        WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE);
        return DataBufferUtils.write(body, channel).map(DataBufferUtils::release).then();
    } catch (IOException exc) {
        return Mono.error(exc);
    }
}

With this implementation, your application will hold a few DataBuffer instances in memory at a given time (the reactive operators are prefetching values for performance reasons) and will write bytes as they come in a reactive fashion.

Carter answered 21/7, 2018 at 12:25 Comment(8)
What about bodyToMono(Resource.class)? Shouldn't that take care of the buffering already?Chaetognath
the ResourceDecoder only supports in-memory variants such as InputStreamResource.class or ByteArrayResource.class. Because you're giving a class to bodyToMono and not an instance, you can't really ask it to write to an existing file.Carter
So then bodyToMono(ByteArrayResource.class) will also load the entire file in to memory and not stream the bytes to the file system? Wouldn't an ExchangeFilterFunction be better suited than creating a separate method like createSpreadsheet?Chaetognath
I don’t understand how you’re expecting that exchange function to work. Please ask a new question showing what you have in mindCarter
This solution keeps the file handler open in Linux. The file gets written as 0KB and has tons of child processes handling the file. I could not find a way to close the file like using channel.close(). At some point opening the Excel file in Windows said it was already open.Chaetognath
what if I want to throw exception depending on http response status code and get exception message from response DTO? i would need to get DTO inside onStatus(...) using block() which leads to the same exception block()/blockFirst()/blockLast()...Reft
"Calling subscribe, as suggested in the comments, is not a good idea either" - Thats wrong I believe or how will the parameter fileWritten be consumed. Either block() or .subscribe() is necessary.Accumulate
@AnnaKlein in the context of this question (calling this code from within a Controller method), calling subscribe is likely to cause the problems listed in the answer. I agree that .subscribe() by itself is not forbidden, but this method is usually called by a Subscriber, not application code.Carter
C
39

[UPDATE 2021/10/19]

toProcessor() is now deprecated.

Consider using

myMono.toFuture().get();

As stated in the most voted answer, one should never block. In my case, that is the only option as we are using a reactive library within an imperative piece of code. The blocking can be done by wrapping the mono in a processor:

myMono.toProcessor().block()
Caithness answered 13/10, 2020 at 15:4 Comment(5)
This is a deprecated method. The docs asks us to use shareNil
myMono.toFuture().get(); Takes forever and the application never execute the next line of code. I tried it on post, get with multiple working urls. Any idea?Ish
toFuture().get() is a blocking operation and technically is the same as block(). It will not result in exception block()/blockFirst()/blockLast() are blocking, which is not supported in thread ... because it's out of Reactor API control but problem is still the same because this operation is blocking and you just hide the issue. There is no reason to block in such cases..Irritability
It's pending foreverSaracen
use share() instead of get()Menswear
D
26

To execute Client Request outside the Server Request pool, use myWebClientMono.share().block();

Disloyal answered 4/3, 2021 at 13:8 Comment(5)
What does share() do exactly??Jaquith
It will execute a blocking call outside the current worker's pool.Disloyal
This unfortunately didn't work for me, . I'm using Spring boot 2.4.5. Any idea?Ish
@AntonSeredkin Thanks a ton. It worked for me after trying different things for more than a week. My error message read ......not supported in thread parallel-2. Can you elaborate what does blocking call outside current worker's pool mean?Fike
in my case the request just hangs once the response is receivedReft
W
2

Try myMono.subscribeOn(Schedulers.boundedElastic()).toFuture().get(5L, TimeUnit.SECONDS)

Wilkens answered 11/1, 2023 at 17:17 Comment(0)
A
2

[UPDATE 2023/01/31]

I would like make an addition to this topic and share my solution since exchange() operator is deprecated since 5.3 version.

Details:

Deprecated. since 5.3 due to the possibility to leak memory and/or connections; please, use exchangeToMono(Function), exchangeToFlux(Function); consider also using retrieve() which provides access to the response status and headers via ResponseEntity along with error status handling.

So, I'll give an example for this task using retrieve() operator and somehow simplify the file saving to file system along with streaming approach.

As it gives us an opportunity to access both headers and response body, we could do something like this:

Mono<Void> fileWritten = webClient.get()
        .uri(uriBuilder -> uriBuilder.path("/file/").build())
        .retrieve()                         // using retrieve since exchange() is deprecated
        .toEntityFlux(DataBuffer.class)     // return Mono<ResponseEntity<Flux<DataBuffer>>>
        .flatMap(entity -> {
            // here we can access headers, body and etc. since we have ResponseEntity here
            if (MediaType.APPLICATION_JSON_VALUE.equals(entity.getHeaders().getContentType().toString())) {
                return createFile(entity.getBody(), "no_file_payload_response"); // save no payload body to a file
            } else {
                return createFile(entity.getBody(), "file"); // save file body to a file
            }
        });

fileWritten.subscribe(); // just for testing purposes, subscribe where you want depending on your requirements

The method for saving our stream Publisher<DataBuffer> to file system:

private Mono<Void> createFile(Publisher<DataBuffer> body, String fileName) {
    Path path = Path.of("your_desired_path/" + fileName);
    return DataBufferUtils.write(body, path,
            StandardOpenOption.CREATE_NEW); // use OpenOption you want depending on your requirements
}

Also, as you can see, using DataBufferUtils.write() we can directly write our stream to a file

We don't use any blocking API's here, like Input/OutputStream, so we're not buffering the entire content of a file in memory at the time.

Allista answered 31/1, 2023 at 11:26 Comment(0)
D
-11
RestResultMessage message= createWebClient()
                .get()
                .uri(uri)
                .exchange()
                .map(clientResponse -> {
                    //delegation
                    ClientResponseWrapper wrapper = new 
                                 ClientResponseWrapper(clientResponse);
                    return Mono.just(wrapper);
                })
                .block() //wait until request is not done
                .map(result -> {  
                    //convert to any data
                    if (!result.statusCode().isError()){
                       //extract the result from request
                        return create(RestResultMessage.Result.success, result.bodyToMono(String.class).block());}
                    } else {
                        return create(RestResultMessage.Result.error, result.statusCode().name());
                    }
                })
                .block();
Dresden answered 30/1, 2020 at 9:1 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.