Spring Webflux using a blocking HttpClient in a Reactive Stack
Asked Answered
B

1

6

I am currently on a Project that builds Microservices, and are trying to move from the more traditional Spring Boot RestClient to Reactive Stack using Netty and WebClient as the HTTP Client in order to connect to backend systems.

This is going well for backends with REST APIs, however I'm still having some difficulties implementing WebClient to services that connect to SOAP backends and Oracle databases, which still uses traditional JDBC.

I managed to find some workaround online regarding JDBC calls that make use of parallel schedulers to publish the result of the blocking JDBC call:

//the method that is called by @Service
@Override
public Mono<TransactionManagerModel> checkTransaction(String transactionId, String channel, String msisdn) {
    return asyncCallable(() -> checkTransactionDB(transactionId, channel, msisdn))
            .onErrorResume(error -> Mono.error(error));
}

...

//the actual JDBC call
private TransactionManagerModel checkTransactionDB(String transactionId, String channel, String msisdn) {
...
    List<TransactionManagerModel> result = 
                    jdbcTemplate.query(CHECK_TRANSACTION, paramMap, new BeanPropertyRowMapper<>(TransactionManagerModel.class));
...
}

//Generic async callable
private <T> Mono<T> asyncCallable(Callable<T> callable) {
    return Mono.fromCallable(callable).subscribeOn(Schedulers.parallel()).publishOn(transactionManagerJdbcScheduler);
}

and I think this works quite well.

While for SOAP calls, what I did was encapsulating the SOAP call in a Mono while the SOAP call itself is using a CloseableHttpClient which is obviously a blocking HTTP Client.

//The method that is being 'reactive'
public Mono<OfferRs> addOffer(String transactionId, String channel, String serviceId, OfferRq request) {
...
    OfferRs result = adapter.addOffer(transactionId, channel, generateRequest(request));
...
}

//The SOAP adapter that uses blocking HTTP Client
public OfferRs addOffer(String transactionId, String channel, JAXBElement<OfferRq> request) {
...
    response = (OfferRs) getWebServiceTemplate().marshalSendAndReceive(url, request, webServiceMessage -> {
            try {
                SoapHeader soapHeader = ((SoapMessage) webServiceMessage).getSoapHeader();
                    
                ObjectFactory headerFactory = new ObjectFactory();
                AuthenticationHeader authHeader = headerFactory.createAuthenticationHeader();
                authHeader.setUserName(username);
                authHeader.setPassWord(password);
                    
                JAXBContext headerContext = JAXBContext.newInstance(AuthenticationHeader.class);
                Marshaller marshaller = headerContext.createMarshaller();
                marshaller.marshal(authHeader, soapHeader.getResult());
            } catch (Exception ex) {
                log.error("Failed to marshall SOAP Header!", ex);
            }
        });
        return response;
...
}

My question is: Does this implementation for SOAP calls "reactive" enough that I won't have to worry about some calls being blocked in some part of the microservice? I have already implemented reactive stack - calling a block() explicitly will throw an exception as it's not permitted if using Netty.

Or should I adapt the use of parallel Schedulers in SOAP calls as well?

Bullfrog answered 6/3, 2021 at 13:7 Comment(7)
one option for doing reactive relation database calls is R2DBC - r2dbc.io/https://r2dbc.io There is an equivalent spring data library for this.Galvan
@MichaelMcFadyen I have taken a look at R2DBC, but unfortunately support for Oracle DB drivers were....lacking. Oracle reactive drivers require JDK11 while my project still stucked at 8. docs.oracle.com/en/database/oracle/oracle-database/21/jjdbc/… For SOAP though, I'm still unsureBullfrog
ah good point. I didn't know there was no support for the oracle driver yet TIL.Galvan
all blocking calls should be placed on their own scheduler , so that they dont block any of the regular scheduled threads. Your soap requests should probably be done as you have done with the jdbc calls. The only thing i have an opinion about is that Schedulers.parallel() will create workers on multiple cores, which is probably not needed due to the fact that there is a setup time and using multiple cores is usually only needed if there is cpu intensive work. The second thing is the use of publishOn.Pachston
So what you have declared now is that when someone subscribes, they will automatically be assigned a thread on a random core, then when we reach the publishOn statement the current thread will switch from the designated thread on the designated core to a thread in the transactionManagerJdbcScheduler (i dont know what type of Scheduler this is. I'd probably just start out with a single Schedulers.boundedElastic() placed on onSubscribe as this scheduler will scale up and down as needed and after 60s remove unused threads. It has a cap of cpu cores x 10 threads.Pachston
@Pachston sorry, I forgot to mention that transactionManagerJdbcScheduler was this implementation: Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize)); where connectionPoolSize is the amount of max conn pool the JDBC is set (with Hikari); which after some thought, might be an overkill due to the amount of threads it will create. I'm thinking about using boundedElastic() or Executors.newCachedThreadPool() instead.Bullfrog
@Pachston However, will using different Schedulers for Publish and Subscribe affect the performance due to context switching?Bullfrog
P
6

After some discussions i'll write an answer.

Reactor documentation states that you should place blocking calls on their own schedulers. Thats basically to keep the non-blocking part of reactor going, and if something comes in that blocks, then reactor will fallback to traditional servlet behaviour which means assigning one thread to each request.

Reactor has very good documentation about schedulers their types etc.

But short:

onSubscribe

When someone subscribes, reactor will go into something called the assembly phase which means it will basically from the subscribe point start calling the operators backwards upstream until it finds a producer of data (for example a database, or another service etc). If it finds a onSubscribe-operator somewhere during this phase it will place this entire chain on its own defined Scheduler. So one good thing to know is that placement of the onSubscribe does not really matter, as long as it is found during the assembly phase the entire chain will be affected.

Example usage could be:

We have blocking calls to a database, slow calls using a blocking rest client, reading a file from the system in a blocking manor etc.

onPublish

if you have onPublish somewhere in the chain during the assembly phase the chain will know that where it is placed the chain will switch from the default scheduler to the designated scheduler at that specific point. So onPublish placement DOES matter. As it will switch at where it is placed. This operator is more to control that you want to place something on a specific scheduler at specific point in the code.

Examples usage could be:

You are doing some heavy blocking cpu calculations at a specific point, you could switch to a Scheduler.parallell() that will guarantee that all calculations will be placed on separate cores do do heavy cpu work, and when you are done you could switch back to the default scheduler.

Above example

Your soap calls should be placed on its own Scheduler if they are blocking and i think onSubscribe will be enough with a usage of a Schedulers.elasticBound() will be fine to get traditional servlet behaviour. If you feel like you are scared of having every blocking call on the same Scheduler, you could pass in the Scheduler in the asyncCallable function and split up calls to use different Schedulers.

Pachston answered 6/3, 2021 at 22:26 Comment(1)
Thank you! I can adjust more to the JDBC call as well after this discussion. Glad to be able to learn more about reactive.Bullfrog

© 2022 - 2024 — McMap. All rights reserved.