Spring WebFlux Flux behavior with non streaming application/json
Asked Answered
J

2

9

I am evaluating using Spring Webflux but we have to support clients that expect application/json, not application/stream+json. I am unclear how Spring WebFlux handles serializing a Flux in the case of a client that needs application/json.

If a Flux is being serialized as application/json rather than application/stream+json is it a blocking operation?

Below I've put together a sample controller to demonstrate what I am seeing. When the stream is infinite and produces application/json nothing is returned to browser. This seems reasonable as it is probably waiting on the stream to terminate. When the stream is infinite and produces application/stream+json I see JSON objects continuously in the browser as expected. When the Flux is finite, at say 100 elements, and the type is application/json it renders as expected all at once. The question is, does it have to wait for the Flux to terminate before serializing, and does that cause a blocking operation. What are the implications to performance and scalability using Flux when returning normal application/json?

@RestController
public class ReactiveController {

    /* Note: In the browser this sits forever and never renders */
    @GetMapping(path = "/nonStreaming", produces = MediaType.APPLICATION_JSON_VALUE)
    public Flux<Person> getPeopleNonStreaming() {
        return Flux.interval(Duration.ofMillis(100))
                .map(tick -> new Person("Dude", "Dude", tick));
    }

    /* Note: This renders in the browser in chunks forever */
    @GetMapping(path = "/streaming", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<Person> getPeopleStreaming() {
        return Flux.interval(Duration.ofMillis(100))
                .map(tick -> new Person("Dude", "Dude", tick));
    }

    /* Note: This returns, but I can't tell if it is done in a non blocking manner. It
     * appears to gather everything before serializing. */
    @GetMapping(path = "/finiteFlux", produces = MediaType.APPLICATION_JSON_VALUE)
    public Flux<Person> finiteFlux() {
        return Flux.range(0, 100)
                .map(tick -> new Person("Dude", "Dude", tick));
    }
}

UPDATE:

I've added additional logging information below:

The streaming appears to be using two different threads

2019-02-13 16:53:07.363 DEBUG 3416 --- [ctor-http-nio-2] o.s.w.s.adapter.HttpWebHandlerAdapter    : [dac80fd4] HTTP GET "/streaming"
2019-02-13 16:53:07.384 DEBUG 3416 --- [ctor-http-nio-2] s.w.r.r.m.a.RequestMappingHandlerMapping : [dac80fd4] Mapped to public reactor.core.publisher.Flux<io.jkratz.reactivedemo.Person> io.jkratz.reactivedemo.ReactiveController.getPeopleStreaming()
2019-02-13 16:53:07.398 DEBUG 3416 --- [ctor-http-nio-2] o.s.w.r.r.m.a.ResponseBodyResultHandler  : Using 'application/stream+json;q=0.8' given [text/html, application/xhtml+xml, image/webp, image/apng, application/xml;q=0.9, */*;q=0.8] and supported [application/stream+json]
2019-02-13 16:53:07.398 DEBUG 3416 --- [ctor-http-nio-2] o.s.w.r.r.m.a.ResponseBodyResultHandler  : [dac80fd4] 0..N [io.jkratz.reactivedemo.Person]
2019-02-13 16:53:07.532 DEBUG 3416 --- [     parallel-1] o.s.http.codec.json.Jackson2JsonEncoder  : [dac80fd4] Encoding [io.jkratz.reactivedemo.Person@6b3e843d]
2019-02-13 16:53:07.566 DEBUG 3416 --- [ctor-http-nio-2] r.n.channel.ChannelOperationsHandler     : [id: 0xdac80fd4, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52398] Writing object DefaultHttpResponse(decodeResult: success, version: HTTP/1.1)
HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: application/stream+json;q=0.8;charset=UTF-8
2019-02-13 16:53:07.591 DEBUG 3416 --- [ctor-http-nio-2] r.n.channel.ChannelOperationsHandler     : [id: 0xdac80fd4, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52398] Writing object 
2019-02-13 16:53:07.629 DEBUG 3416 --- [     parallel-1] o.s.http.codec.json.Jackson2JsonEncoder  : [dac80fd4] Encoding [io.jkratz.reactivedemo.Person@217d62db]
2019-02-13 16:53:07.630 DEBUG 3416 --- [ctor-http-nio-2] r.n.channel.ChannelOperationsHandler     : [id: 0xdac80fd4, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52398] Writing object 
2019-02-13 16:53:07.732 DEBUG 3416 --- [     parallel-1] o.s.http.codec.json.Jackson2JsonEncoder  : [dac80fd4] Encoding [io.jkratz.reactivedemo.Person@741c0c88]
2019-02-13 16:53:07.732 DEBUG 3416 --- [ctor-http-nio-2] r.n.channel.ChannelOperationsHandler     : [id: 0xdac80fd4, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52398] Writing object 
2019-02-13 16:53:07.832 DEBUG 3416 --- [     parallel-1] o.s.http.codec.json.Jackson2JsonEncoder  : [dac80fd4] Encoding [io.jkratz.reactivedemo.Person@7b8532e5]

While the finite with JSON is only using a single thread.

2019-02-13 16:55:34.431 DEBUG 3416 --- [ctor-http-nio-3] o.s.w.s.adapter.HttpWebHandlerAdapter    : [5b048f46] HTTP GET "/finiteFlux"
2019-02-13 16:55:34.432 DEBUG 3416 --- [ctor-http-nio-3] s.w.r.r.m.a.RequestMappingHandlerMapping : [5b048f46] Mapped to public reactor.core.publisher.Flux<io.jkratz.reactivedemo.Person> io.jkratz.reactivedemo.ReactiveController.finiteFlux()
2019-02-13 16:55:34.434 DEBUG 3416 --- [ctor-http-nio-3] o.s.w.r.r.m.a.ResponseBodyResultHandler  : Using 'application/json;q=0.8' given [text/html, application/xhtml+xml, image/webp, image/apng, application/xml;q=0.9, */*;q=0.8] and supported [application/json]
2019-02-13 16:55:34.435 DEBUG 3416 --- [ctor-http-nio-3] o.s.w.r.r.m.a.ResponseBodyResultHandler  : [5b048f46] 0..N [io.jkratz.reactivedemo.Person]
2019-02-13 16:55:34.439 DEBUG 3416 --- [ctor-http-nio-3] o.s.http.codec.json.Jackson2JsonEncoder  : [5b048f46] Encoding [[io.jkratz.reactivedemo.Person@425c8296, io.jkratz.reactivedemo.Person@22ae73df, io.jkratz.reactived (truncated)...]
2019-02-13 16:55:34.448 DEBUG 3416 --- [ctor-http-nio-3] r.n.channel.ChannelOperationsHandler     : [id: 0x5b048f46, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52991] Writing object DefaultHttpResponse(decodeResult: success, version: HTTP/1.1)
HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: application/json;q=0.8;charset=UTF-8
2019-02-13 16:55:34.448 DEBUG 3416 --- [ctor-http-nio-3] r.n.channel.ChannelOperationsHandler     : [id: 0x5b048f46, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52991] Writing object 
2019-02-13 16:55:34.450 DEBUG 3416 --- [ctor-http-nio-3] o.s.w.s.adapter.HttpWebHandlerAdapter    : [5b048f46] Completed 200 OK
2019-02-13 16:55:34.450 DEBUG 3416 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x5b048f46, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52991] Last HTTP response frame
2019-02-13 16:55:34.450 DEBUG 3416 --- [ctor-http-nio-3] r.n.channel.ChannelOperationsHandler     : [id: 0x5b048f46, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52991] Writing object EmptyLastHttpContent
2019-02-13 16:55:34.450 DEBUG 3416 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x5b048f46, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52991] Decreasing pending responses, now 0
2019-02-13 16:55:34.451 DEBUG 3416 --- [ctor-http-nio-3] r.n.channel.ChannelOperationsHandler     : [id: 0x5b048f46, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52991] No ChannelOperation attached. Dropping: EmptyLastHttpContent
Judiejudith answered 13/2, 2019 at 21:51 Comment(0)
K
26

In the case of a streaming mimetype (application/stream+json), the JSON codec configured by default in Spring WebFlux will serialize to JSON and flush on the network each element of the Flux input. This behavior is handy when the stream is infinite, or when you want to push information to the client as soon as it's available. Note that this has a performance cost, as calling the serializer and flushing multiple times takes resources.

In the case of a non-streaming type (application/json), the JSON codec configured by default in Spring WebFlux will serialize to JSON and flush to the network in one go. It will buffer the Flux<YourObject> in memory and serialize it in one pass. This doesn't mean the operation is blocking, since the resulting Flux<Databuffer> is written in a reactive fashion to the network. nothing is blocking here.

This is just a tradeoff between "streaming data and using more resources" vs. "buffering and using resources more efficiently".

In the case of streaming, things are more likely to be processed by different worker threads, since work items are available at different intervals. In the case of the simple JSON response - it might be processed by one or more threads as well: it depends on the payload size, if the remote client is slow or not.

Kilkenny answered 14/2, 2019 at 9:52 Comment(2)
Sidenote: APPLICATION_STREAM_JSON_VALUE is @Deprecated since now.Duce
@Duce but APPLICATION_NDJSON_VALUE is not and behaves the sameZebra
C
4

It seems that all magic is happening inside AbstractJackson2Encoder#encode method. This is the code for the regular application/json serialization:

// non-streaming

return Flux.from(inputStream)
            .collectList() // make Mono<List<YourClass>> from Flux<YourClass>
            .map(list -> encodeValue(list, bufferFactory, listType, mimeType, hints)) // serialize list to JSON and write to DataBuffer
            .flux(); // get Flux<DataBuffer> from Mono<DataBuffer>

So, yes it waits for the Flux to terminate before serializing.

Performance improvements are questionable because it always has to wait all data to serialize. So this no much difference between Flux or regular List in the case of application/json media type

Chattanooga answered 10/4, 2020 at 18:56 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.