How to correctly read Flux<DataBuffer> and convert it to a single inputStream
Asked Answered
I

6

51

I'm using WebClient and custom BodyExtractorclass for my spring-boot application

WebClient webLCient = WebClient.create();
webClient.get()
   .uri(url, params)
   .accept(MediaType.APPLICATION.XML)
   .exchange()
   .flatMap(response -> {
     return response.body(new BodyExtractor());
   })

BodyExtractor.java

@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
  Flux<DataBuffer> body = response.getBody();
  body.map(dataBuffer -> {
    try {
      JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
      Unmarshaller unmarshaller = jc.createUnmarshaller();

      return (T) unmarshaller.unmarshal(dataBuffer.asInputStream())
    } catch(Exception e){
       return null;
    }
  }).next();
}

Above code works with small payload but not on a large payload, I think it's because I'm only reading a single flux value with next and I'm not sure how to combine and read all dataBuffer.

I'm new to reactor, so I don't know a lot of tricks with flux/mono.

Instruct answered 28/9, 2017 at 3:42 Comment(0)
I
5

I was able to make it work by using Flux#collect and SequenceInputStream

@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
  Flux<DataBuffer> body = response.getBody();
  return body.collect(InputStreamCollector::new, (t, dataBuffer)-> t.collectInputStream(dataBuffer.asInputStream))
    .map(inputStream -> {
      try {
        JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
        Unmarshaller unmarshaller = jc.createUnmarshaller();

        return (T) unmarshaller.unmarshal(inputStream);
      } catch(Exception e){
        return null;
      }
  }).next();
}

InputStreamCollector.java

public class InputStreamCollector {
  private InputStream is;

  public void collectInputStream(InputStream is) {
    if (this.is == null) this.is = is;
    this.is = new SequenceInputStream(this.is, is);
  }

  public InputStream getInputStream() {
    return this.is;
  }
}
Instruct answered 28/9, 2017 at 6:33 Comment(8)
why are you writing your own BodyExtractor? WebFlux already supports Jaxb with Jaxb2XmlDecoder.Xiaoximena
@BrianClozel do I need to configure something for it to work? bodyToMono doesn't seem to pick up my pojo's.Instruct
What's InputStreamCollector?Chickabiddy
@AbhijitSarkar please check my usage above.Instruct
Interesting, but WebClient is the wrong tool for this job. You're reconstructing the response InputStream, so you get no advantage of using WebClient. You're better off using a plain vanilla HTTP client.Chickabiddy
Isn't this solution reads all response body into memory? ByteBuffer stores all it's data in memory, right? So resulting InputStream will be the same as ByteArrayInputStream, so this solution doesn't handle big data.Manducate
I am using a similar approach. Two recommendations I would add 1. to hold a reference to the JAXBContext and 2. reuse use dataBuffer.asInputStream(true), so than an eventual close of the inputStream will close also the dataBufferKnobkerrie
thank you very much, i had to reprocess some files while uploading, i had to collect all chunks. You can run in all kinds of problems with large files.Debarath
S
28

This is really not as complicated as other answers imply.

The only way to stream the data without buffering it all in memory is to use a pipe, as @jin-kwon suggested. However, it can be done very simply by using Spring's BodyExtractors and DataBufferUtils utility classes.

Example:

private InputStream readAsInputStream(String url) throws IOException {
    PipedOutputStream osPipe = new PipedOutputStream();
    PipedInputStream isPipe = new PipedInputStream(osPipe);

    ClientResponse response = webClient.get().uri(url)
        .accept(MediaType.APPLICATION.XML)
        .exchange()
        .block();
    final int statusCode = response.rawStatusCode();
    // check HTTP status code, can throw exception if needed
    // ....

    Flux<DataBuffer> body = response.body(BodyExtractors.toDataBuffers())
        .doOnError(t -> {
            log.error("Error reading body.", t);
            // close pipe to force InputStream to error,
            // otherwise the returned InputStream will hang forever if an error occurs
            try(isPipe) {
              //no-op
            } catch (IOException ioe) {
                log.error("Error closing streams", ioe);
            }
        })
        .doFinally(s -> {
            try(osPipe) {
              //no-op
            } catch (IOException ioe) {
                log.error("Error closing streams", ioe);
            }
        });

    DataBufferUtils.write(body, osPipe)
        .subscribe(DataBufferUtils.releaseConsumer());

    return isPipe;
}

If you don't care about checking the response code or throwing an exception for a failure status code, you can skip the block() call and intermediate ClientResponse variable by using

flatMap(r -> r.body(BodyExtractors.toDataBuffers()))

instead.

Sulfonation answered 7/10, 2019 at 14:50 Comment(10)
looks promising and simple, this is probably the proper answer to handle large requests. I'll try this one out if I have the time.Instruct
I should add that I do agree with @abhijit-sarkar's earlier comment that WebClient is not the best tool for this job. Although it can be done (as I have demonstrated), it is not the most efficient way of doing this. If all you need is an InputStream, you are better off using a synchronous client such as java.net.http.HttpClient. If you are stuck with WebClient, then I believe my solution is the best option.Sulfonation
It seems like if there's no error, isPipe never gets closedChickabiddy
@AbhijitSarkar, There is no need to close isPipe here. That would be the responsibility of the caller of this readAsInputStream() method (i.e. to use the InputStream in a try-with-resources block). The doFinally() here ensures that osPipe always gets closed, and that will trigger a call to the sink (isPipe) that the end of the input has been reached and it will return -1/EOF on the next read.Sulfonation
Change PipedInputSteam to PipedInputStream and MediaType.APPLICATION.XML to MediaType.APPLICATION_XML. I get rid of the status code so i need to use flatMapMany(r -> r.body(BodyExtractors.toDataBuffers())) instead of flatMap(r -> r.body(BodyExtractors.toDataBuffers()))Thadeus
Try with resources for variable declared outside of try block, requires minimum java version of 9Knobkerrie
Did not work using Java 8 using reactor-core 3.3.9.RELEASE. The PipedInputStream and PipedOutputStream contain only 0's with no termination. It hangs my unmarshaller in the call unmarshaller.unmarshal(isPipe). In fact, in my debugger, the doFinally never gets called, which is suspectKnobkerrie
did not work getting java.io.IOException pipe not connected , any clue on thisElation
.exchange() is deprecated. Any alternative way? @SulfonationHexameter
using .exchangeToFlux(x -> { return x.body(BodyExtractors.toDataBuffers() .... }) works as also substituting try-with-resources with simple try { isPipe.close(); } catch... does too (on Spring Boot 2.7.6 / Java 8)Wende
C
10

A slightly modified version of Bk Santiago's answer makes use of reduce() instead of collect(). Very similar, but doesn't require an extra class:

Java:

body.reduce(new InputStream() {
    public int read() { return -1; }
  }, (s: InputStream, d: DataBuffer) -> new SequenceInputStream(s, d.asInputStream())
).flatMap(inputStream -> /* do something with single InputStream */

Or Kotlin:

body.reduce(object : InputStream() {
  override fun read() = -1
}) { s: InputStream, d -> SequenceInputStream(s, d.asInputStream()) }
  .flatMap { inputStream -> /* do something with single InputStream */ }

Benefit of this approach over using collect() is simply you don't need to have a different class to gather things up.

I created a new empty InputStream(), but if that syntax is confusing, you can also replace it with ByteArrayInputStream("".toByteArray()) instead to create an empty ByteArrayInputStream as your initial value instead.

Chemistry answered 14/8, 2018 at 15:22 Comment(1)
Instead of new InputStream() { public int read() { return -1; } } you can use InputStream.nullInputStream()Fantom
S
8

Here comes another variant from other answers. And it's still not memory-friendly.

// WARNING: NOT-MEMORY-FRIENDLY!
// WARNING: NOT-MEMORY-FRIENDLY!
// WARNING: NOT-MEMORY-FRIENDLY!
static Mono<InputStream> asStream(WebClient.ResponseSpec response) {
    return response.bodyToFlux(DataBuffer.class)
        .map(b -> b.asInputStream(true))
        .reduce(SequenceInputStream::new);
}

static void doSome(WebClient.ResponseSpec response) {
    asStream(response)
        .doOnNext(stream -> {
            // do some with stream
            // close the stream!!!
        })
        .block();
}
Sumerology answered 13/10, 2019 at 9:42 Comment(4)
Super easy when dealing with small files.Extrinsic
@Tires I really doubt about DataBuffer::asInputStream. See asInputStream()Sumerology
@JinKwon You're right. I'm wondering why I don't see the Netty warning about unreleased buffers beforeMidsection
Be careful. If you close the SequenceInputStream (you should otherwise you will get unreleased buffer errors from Netty) then it can cause a StackoverflowError very easily if you have a big file or lots of small buffers.Ornie
I
5

I was able to make it work by using Flux#collect and SequenceInputStream

@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
  Flux<DataBuffer> body = response.getBody();
  return body.collect(InputStreamCollector::new, (t, dataBuffer)-> t.collectInputStream(dataBuffer.asInputStream))
    .map(inputStream -> {
      try {
        JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
        Unmarshaller unmarshaller = jc.createUnmarshaller();

        return (T) unmarshaller.unmarshal(inputStream);
      } catch(Exception e){
        return null;
      }
  }).next();
}

InputStreamCollector.java

public class InputStreamCollector {
  private InputStream is;

  public void collectInputStream(InputStream is) {
    if (this.is == null) this.is = is;
    this.is = new SequenceInputStream(this.is, is);
  }

  public InputStream getInputStream() {
    return this.is;
  }
}
Instruct answered 28/9, 2017 at 6:33 Comment(8)
why are you writing your own BodyExtractor? WebFlux already supports Jaxb with Jaxb2XmlDecoder.Xiaoximena
@BrianClozel do I need to configure something for it to work? bodyToMono doesn't seem to pick up my pojo's.Instruct
What's InputStreamCollector?Chickabiddy
@AbhijitSarkar please check my usage above.Instruct
Interesting, but WebClient is the wrong tool for this job. You're reconstructing the response InputStream, so you get no advantage of using WebClient. You're better off using a plain vanilla HTTP client.Chickabiddy
Isn't this solution reads all response body into memory? ByteBuffer stores all it's data in memory, right? So resulting InputStream will be the same as ByteArrayInputStream, so this solution doesn't handle big data.Manducate
I am using a similar approach. Two recommendations I would add 1. to hold a reference to the JAXBContext and 2. reuse use dataBuffer.asInputStream(true), so than an eventual close of the inputStream will close also the dataBufferKnobkerrie
thank you very much, i had to reprocess some files while uploading, i had to collect all chunks. You can run in all kinds of problems with large files.Debarath
C
4

There's a much cleaner way to do this using the underlying reactor-netty HttpClient directly, instead of using WebClient. The composition hierarchy is like this:

WebClient -uses-> HttpClient -uses-> TcpClient

Easier to show code than explain:

HttpClient.create()
    .get()
    .responseContent() // ByteBufFlux
    .aggregate() // ByteBufMono
    .asInputStream() // Mono<InputStream>
    .block() // We got an InputStream, yay!

However, as I've pointed out already, using InputStream is a blocking operation, that defeats the purpose of using a non-blocking HTTP client, not to mention aggregating the whole response. See this for a Java NIO vs. IO comparison.

Chickabiddy answered 27/7, 2020 at 6:24 Comment(1)
I understand that using the InputStream is blocking operation. However imagine scenario where I need to get some xml file from other server and then use some data from the xml. At that point I would like to get the file non blocking way and then I just need to parse it. Isn't this a way to go? I don't wanna block the thread when doing the network call but when I have whole response with all the data I am ok with conversion to InputStream or parsing it blocking way. Or is there a catch I am missing (e.g. the whole operation including network call have to be blocking with this apporach)?Bohemia
S
2

You can use pipes.

static <R> Mono<R> pipeAndApply(
        final Publisher<DataBuffer> source, final Executor executor,
        final Function<? super ReadableByteChannel, ? extends R> function) {
    return using(Pipe::open,
                 p -> {
                     executor.execute(() -> write(source, p.sink())
                             .doFinally(s -> {
                                 try {
                                     p.sink().close();
                                 } catch (final IOException ioe) {
                                     log.error("failed to close pipe.sink", ioe);
                                     throw new RuntimeException(ioe);
                                 }
                             })
                             .subscribe(releaseConsumer()));
                     return just(function.apply(p.source()));
                 },
                 p -> {
                     try {
                         p.source().close();
                     } catch (final IOException ioe) {
                         log.error("failed to close pipe.source", ioe);
                         throw new RuntimeException(ioe);
                     }
                 });
}

Or using CompletableFuture,

static <R> Mono<R> pipeAndApply(
        final Publisher<DataBuffer> source,
        final Function<? super ReadableByteChannel, ? extends R> function) {
    return using(Pipe::open,
                 p -> fromFuture(supplyAsync(() -> function.apply(p.source())))
                         .doFirst(() -> write(source, p.sink())
                                 .doFinally(s -> {
                                     try {
                                         p.sink().close();
                                     } catch (final IOException ioe) {
                                         log.error("failed to close pipe.sink", ioe);
                                         throw new RuntimeException(ioe);
                                     }
                                 })
                                 .subscribe(releaseConsumer())),
                 p -> {
                     try {
                         p.source().close();
                     } catch (final IOException ioe) {
                         log.error("failed to close pipe.source", ioe);
                         throw new RuntimeException(ioe);
                     }
                 });
}
Sumerology answered 12/5, 2019 at 5:36 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.