spring webflux Flux<DataBuffer> convert to InputStream
Asked Answered
P

4

7

I'm currently working on Spring WebFlux. I'm trying to upload large file (70mo) using Spring WebFlux.

My Controller

@RequestMapping(method = RequestMethod.POST, consumes = MediaType.MULTIPART_FORM_DATA_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
public Flux<String> uploadHandler(@RequestBody Flux<Part> fluxParts, @RequestParam(value = "categoryType") String categoryType, @PathVariable(value = "traceabilityReportUuid") String traceabilityUuid) {
    return documentHandler.upload(fluxParts, UUID.fromString(traceabilityUuid), categoryType);
}

My Service

public Flux<String> upload(Flux<Part> fluxParts, UUID traceabilityUuid, String categoryType) {

    return fluxParts
            .filter(part -> part instanceof FilePart)
            .ofType(FilePart.class)
            .flatMap(p -> this.upload(p, traceabilityUuid, categoryType));


}


private Mono<String> upload(FilePart filePart, UUID traceabilityUuid, String categoryType) {

    return filePart.content().collect(InputStreamCollector::new, (t, dataBuffer) -> t.collectInputStream(dataBuffer.asInputStream()))
            .flatMap(inputStreamCollector -> {
                upload(traceabilityUuid, inputStreamCollector.getInputStream(), filePart.filename(), categoryType);

                return Mono.just("OK");
            });
}

My Collector

public class InputStreamCollector {

    private InputStream is;

    public void collectInputStream(InputStream is) {
        if (this.is == null) this.is = is;
        this.is = new SequenceInputStream(this.is, is);
    }

    public InputStream getInputStream() {
        return this.is;
    }
}

And at the end, I retrieve the full inputstream by this way : inputStreamCollector.getInputStream() and pass to my object.

And I use this object in order to send to bucket S3.

But before sending to S3, I must to convert it into file (using apache tools), I have a stackoverflow exception.

java.lang.StackOverflowError: null
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)

it works fine with a small file (7mo ..)

Do you have an idea to resolve my issue please ?

Preference answered 3/8, 2018 at 16:42 Comment(0)
P
7

Finally I found the solution !

https://github.com/entzik/reactive-spring-boot-examples/blob/master/src/main/java/com/thekirschners/springbootsamples/reactiveupload/ReactiveUploadResource.java

I adapted the code in order to return an InputStream and it works fine with large files ;-)

Preference answered 4/8, 2018 at 21:41 Comment(3)
finally, It's not the good solution :) the best one is blog.davidvassallo.me/2018/07/09/…Preference
did you end up using multipart/file, and not a buffer?Morell
Note Spring synchronously parse FilePart from request body. When upload big file as 1 multipart/file, at first data transfers to temporary file: System.getProperty("java.io.tmpdir") + "/nio-file-upload/nio-body-<partIndex>-<UUID>.tmp". When upload is finished Flux<Part> start emit events. I think Flux<Part> useful when multiple tiny files uploadsMilano
A
2

To convert DataBuffer to String or List you could use Apache IOUtils. In this sample i'm returning a Flux, and to avoid try/catch i wrapped with Mono.fromCallable.

protected Flux<String> getLines(final DataBuffer dataBuffer) {
    return Mono.fromCallable(() -> IOUtils.readLines(dataBuffer.asInputStream(), Charsets.UTF_8))
        .flatMapMany(Flux::fromIterable);
}
Affettuoso answered 25/1, 2019 at 13:47 Comment(1)
Using blocking APIs in a non-blocking framework is not desirable.Impossibility
B
1

Given a Flux<DataBuffer> fluxPublisher

final var publisher = fluxPublisher
  .map(dataBuffer -> dataBuffer.asInputStream(true))
  .reduce(SequenceInputStream::new);

Will give you a Mono<InputStream>.

The result must be closed after processing otherwise it will leak. The following is an example of some parsing using a Reader then closing

publisher
  .flatMap(is-> {
    try (var reader = new InputStreamReader(is)) {
      // parse
    } catch (IOException e) {
      return Mono.error(e);
    }
  });
Benally answered 6/9, 2022 at 11:42 Comment(0)
S
0

This example will help you to understand how to load data from FilePart:

public static Mono<String> readBase64Content(FilePart filePart) {
    return filePart.content().flatMap(dataBuffer -> {
         byte[] bytes = new byte[dataBuffer.readableByteCount()];
         dataBuffer.read(bytes);
         String content = Base64.getEncoder().encodeToString(bytes);
         return Mono.just(content);
    }).last();
}

Rest method

@PostMapping(value = "/person/{personId}/photo", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
Mono<String> uploadPhoto(@PathVariable Long personId, @RequestPart("photo") Mono<FilePart> photo) {
        return photo.ofType(FilePart.class).flatMap(StringUtil::readBase64Content);
}
Stubby answered 1/8, 2020 at 21:48 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.