Spring webflux and reading from database
Asked Answered
E

4

31

Spring 5 introduces the reactive programming style for rest APIs with webflux. I'm fairly new to it myself and was wondering wether wrapping synchronous calls to a database into Flux or Mono makes sense preformence-wise? If yes, is this the way to do it:

@RestController
public class HomeController {

    private MeasurementRepository repository;

    public HomeController(MeasurementRepository repository){
        this.repository = repository;
    }

    @GetMapping(value = "/v1/measurements")
    public Flux<Measurement> getMeasurements() {
        return Flux.fromIterable(repository.findByFromDateGreaterThanEqual(new Date(1486980000L)));
    }

}

Is there something like an asynchronous CrudRepository? I couldn't find it.

Essentialism answered 17/2, 2017 at 13:40 Comment(3)
JDBC code is inherently synchronous there aren't any reactive JDBC drivers out there (and I doubt there ever will be). So accessing a database like this doesn't really make sense.Bozarth
I am not familiar with flux, but I know you can use Java 8 Stream as the return type in Spring Data JPA. You can return Stream<Measurement>. Not sure if this comment helps or not though :)Imminent
It's a good start, but it's not asynchronous so the caller blocks whilst the JDBC operation is in progress which breaks the webflux non-blocking paradigm.Guilty
C
30

One option would be to use alternative SQL clients that are fully non-blocking. Some examples include: https://github.com/mauricio/postgresql-async or https://github.com/finagle/roc. Of course, none of these drivers is officially supported by database vendors yet. Also, functionality is way much less attractive comparing to mature JDBC-based abstractions such as Hibernate or jOOQ.

The alternative idea came to me from Scala world. The idea is to dispatch blocking calls into isolated ThreadPool not to mix blocking and non-blocking calls together. This will allow us to control the overall number of threads and will let the CPU serve non-blocking tasks in the main execution context with some potential optimizations. Assuming that we have JDBC based implementation such as Spring Data JPA which is indeed blocking, we can make it’s execution asynchronous and dispatch on the dedicated thread pool.

@RestController
public class HomeController {

    private final MeasurementRepository repository;
    private final Scheduler scheduler;

    public HomeController(MeasurementRepository repository, @Qualifier("jdbcScheduler") Scheduler scheduler) {
        this.repository = repository;
        this.scheduler = scheduler;
    }

    @GetMapping(value = "/v1/measurements")
    public Flux<Measurement> getMeasurements() {
        return Mono.fromCallable(() -> repository.findByFromDateGreaterThanEqual(new Date(1486980000L))).publishOn(scheduler);
    }

}

Our Scheduler for JDBC should be configured by using dedicated Thread Pool with size count equal to the number of connections.

@Configuration
public class SchedulerConfiguration {
    private final Integer connectionPoolSize;

    public SchedulerConfiguration(@Value("${spring.datasource.maximum-pool-size}") Integer connectionPoolSize) {
        this.connectionPoolSize = connectionPoolSize;
    }

    @Bean
    public Scheduler jdbcScheduler() {
        return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize));
    }

}

However, there are difficulties with this approach. The main one is transaction management. In JDBC, transactions are possible only within a single java.sql.Connection. To make several operations in one transaction, they have to share a connection. If we want to make some calculations in between them, we have to keep the connection. This is not very effective, as we keep a limited number of connections idle while doing calculations in between.

This idea of an asynchronous JDBC wrapper is not new and is already implemented in Scala library Slick 3. Finally, non-blocking JDBC may come along on the Java roadmap. As it was announced at JavaOne in September 2016, and it is possible that we will see it in Java 10.

Celestinacelestine answered 22/5, 2017 at 17:30 Comment(5)
Is there a recommended option or WIP project that implements the CrudRepository? It's a shame to say that SQL projects can't use reactor/WebFlux until Java 10 at the earliest.Guilty
@Guilty from multiple sources I can see that this work is not planned yet, at least by Pivotal: jira.spring.io/browse/DATAJPA-701 spring.io/blog/2017/02/23/…Celestinacelestine
The publishOn(scheduler) call looks weird to me in the code example. Shoud you us instead subscribeOn which run the request() to this publisher on a the given scheduler ?Ancona
It should instead be subscribeOn() method using a thread pool not publishOn() as the code above has.Sickly
what about insert? do we need commit transaction ourselves? because I suspect that @Transaction will not work for such case, because we quit from method before performing work in thread.Jubilant
F
10

Based on this blog you should rewrite your snippet in following way

@GetMapping(value = "/v1/measurements")
public Flux<Measurement> getMeasurements() {
    return Flux.defer(() -> Flux.fromIterable(repository.findByFromDateGreaterThanEqual(new Date(1486980000L))))
           .subscribeOn(Schedulers.elastic());
}
Fernandes answered 15/8, 2018 at 21:0 Comment(0)
P
8

Obtaining a Flux or a Mono doesn’t necessarily mean it will run in a dedicated Thread. Instead, most operators continue working in the Thread on which the previous operator executed. Unless specified, the topmost operator (the source) itself runs on the Thread in which the subscribe() call was made.

If you have blocking persistence APIs (JPA, JDBC) or networking APIs to use, Spring MVC is the best choice for common architectures at least. It is technically feasible with both Reactor and RxJava to perform blocking calls on a separate thread but you would not be making the most of a non-blocking web stack.

So... How do I wrap a synchronous, blocking call?

Use Callable to defer execution. And you should use Schedulers.elastic because it creates a dedicated thread to wait for the blocking resource without tying up some other resource.

  • Schedulers.immediate() : Current thread.
  • Schedulers.single() : A single, reusable thread.
  • Schedulers.newSingle() : A per-call dedicated thread.
  • Schedulers.elastic() : An elastic thread pool. It creates new worker pools as needed, and reuse idle ones. This is a good choice for I/O blocking work for instance.
  • Schedulers.parallel() : A fixed pool of workers that is tuned for parallel work.

example:

Mono.fromCallable(() -> blockingRepository.save())
        .subscribeOn(Schedulers.elastic());
Prurient answered 21/12, 2018 at 8:46 Comment(0)
P
5

Spring data support reactive repository interface for Mongo and Cassandra.

Spring data MongoDb Reactive Interface

Spring Data MongoDB provides reactive repository support with Project Reactor and RxJava 1 reactive types. The reactive API supports reactive type conversion between reactive types.

public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, String> {

    Flux<Person> findByLastname(String lastname);

    @Query("{ 'firstname': ?0, 'lastname': ?1}")
    Mono<Person> findByFirstnameAndLastname(String firstname, String lastname);

    // Accept parameter inside a reactive type for deferred execution
    Flux<Person> findByLastname(Mono<String> lastname);

    Mono<Person> findByFirstnameAndLastname(Mono<String> firstname, String lastname);

    @InfiniteStream // Use a tailable cursor
    Flux<Person> findWithTailableCursorBy();

}

public interface RxJava1PersonRepository extends RxJava1CrudRepository<Person, String> {

    Observable<Person> findByLastname(String lastname);

    @Query("{ 'firstname': ?0, 'lastname': ?1}")
    Single<Person> findByFirstnameAndLastname(String firstname, String lastname);

    // Accept parameter inside a reactive type for deferred execution
    Observable<Person> findByLastname(Single<String> lastname);

    Single<Person> findByFirstnameAndLastname(Single<String> firstname, String lastname);

    @InfiniteStream // Use a tailable cursor
    Observable<Person> findWithTailableCursorBy();
}
Patriapatriarch answered 27/8, 2017 at 5:3 Comment(1)
In the original question, the database result needs to be returned back as response to the RESTful call. In this scenario , is there really a use in using Flux R2DBC ? It has to be blocking anywaysPaediatrics

© 2022 - 2024 — McMap. All rights reserved.