Spring Webflux and @Cacheable - proper way of caching result of Mono / Flux type
Asked Answered
D

4

30

I'm learning Spring WebFlux and during writing a sample application I found a concern related to Reactive types (Mono/Flux) combined with Spring Cache.

Consider the following code-snippet (in Kotlin):

@Repository
interface TaskRepository : ReactiveMongoRepository<Task, String>

@Service
class TaskService(val taskRepository: TaskRepository) {

    @Cacheable("tasks")
    fun get(id: String): Mono<Task> = taskRepository.findById(id)
}

Is this valid and safe way of caching method calls returning Mono or Flux? Maybe there are some other principles to do this?

The following code is working with SimpleCacheResolver but by default fails with Redis because of the fact that Mono is not Serializable. In order to make them work e.g Kryo serializer needs to be used.

Dagny answered 8/1, 2018 at 18:53 Comment(0)
E
45

Hack way

For now, there is no fluent integration of @Cacheable with Reactor 3. However, you may bypass that thing by adding .cache() operator to returned Mono

@Repository
interface TaskRepository : ReactiveMongoRepository<Task, String>

@Service
class TaskService(val taskRepository: TaskRepository) {

    @Cacheable("tasks")
    fun get(id: String): Mono<Task> = taskRepository.findById(id).cache()
}

That hack cache and share returned from taskRepository data. In turn, spring cacheable will cache a reference of returned Mono and then, will return that reference. In other words, it is a cache of mono which holds the cache :).

Reactor Addons Way

There is an addition to Reactor 3 which allows fluent integration with modern in-memory caches like caffeine, jcache, etc. Using that technique you will be capable to cache your data easily:

@Repository
interface TaskRepository : ReactiveMongoRepository<Task, String>

@Service
class TaskService(val taskRepository: TaskRepository) {

    @Autowire
    CacheManager manager;


    fun get(id: String): Mono<Task> = CacheMono.lookup(reader(), id)
                                               .onCacheMissResume(() -> taskRepository.findById(id))
                                               .andWriteWith(writer());

    fun reader(): CacheMono.MonoCacheReader<String, Task> = key -> Mono.<Signal<Task>>justOrEmpty((Signal) manager.getCache("tasks").get(key).get())
    fun writer(): CacheMono.MonoCacheWriter<String, Task> = (key, value) -> Mono.fromRunnable(() -> manager.getCache("tasks").put(key, value));
} 

Note: Reactor addons caching own abstraction which is Signal<T>, so, do not worry about that and following that convention

Entente answered 8/1, 2018 at 19:23 Comment(13)
Thanks for valuable hints but the question still stands: is serializing and caching Mono object itself something risky or considered as bad practice? I would like to use @Cacheable in combination with Redis to move cache outside of the application memory.Dagny
Unfortunately, the better way is integrating with Redis manually, over the second approach with combination, in your case, using Spring Data RedisEntente
the "reactor addons way" above needs to be integrated to @Cacheable at some point in the future to cache the result held by the Mono. Caching a Mono instance itself doesn't make sense, no more than trying to cache a plain Runnable or a FutureChapen
I have question about Reactor Addons way. What about multithreading? If we have 2 threads that want to get data from cache and cache is empty - we will get 2 calls to db?Perloff
@Perloff at a time there is no additional synchronization between callers, thus it may happen that two calls to DB may appear. Therefore you have to add additional calls multiplexing in order to avoid racing. I will add sample to the answerEntente
@OlehDokuka I guess since blocking CacheManager operations are wrapped in Mono, we should be careful on which Scheduler this operates (eg. elastic()), right ?Bessel
@ChristopheBornet that is true. It is recommended to user non-blocking caches if possible (e.g. lettuce.io) or take additional care of running a task on dedicated ScheduleEntente
Do you know if the @Cacheable and .cache() solution leaks memory? @Ilker below recommends using .cache(ttl) with a ttl ≥ that of the cache configuration, if I understand correctly. Do you know if that is needed?Forsberg
@OlehDokuka, is there any simple hack for Kotlin coroutines(suspend functions)?Improbity
for the first approach i have found to also need the annotation @CacheConfig(cacheNames = ....) above the class so the cache actually updates the previous value after getting the result when configured with a timet to live, thanks for the helpfull hackBonitabonito
There is serious flaw in Hack way. If there is error in Mono, in this case exception is thrown inside taskRepository.findById(id), it will be cached and emited for as long as cache is valid.Pedicel
@OlehDokuka with the Hack Way solution I obtain "DefaultSerializer requires a Serializable payload but received an object of type [reactor.core.publisher.MonoCacheTime]" (project reactor), maybe i'm missing something ?Declivous
And of course this does not work if you use Redis, because Mono is not serializableCytogenetics
H
3

I have used Oleh Dokuka's hacky solution worked great but there is a catch. You must use a greater Duration in Flux cache than your Cachable caches timetolive value. If you dont use a duration for Flux cache it wont invalidate it (Flux documentation says "Turn this Flux into a hot source and cache last emitted signals for further Subscriber."). So making Flux cache 2 minutes and timetolive 30 seconds can be valid configuration. If ehcahce timeout occurs first, than a new Flux cache reference is generated and it will be used.

Helpmeet answered 25/1, 2019 at 10:53 Comment(1)
Are you saying that if I use @Cacheable and .cache() it leaks memory? Do I need to explicitly call .cache(ttl) with a ttl ≥ that of the cache configuration?Forsberg
M
0

As of today caching of Mono<T> or Flux<T> works as expected. See the spring caching annotation documentation.

This caches the returned task for the given key as expected:

@Cacheable("tasks")
public Mono<Task> getTask(String id) {...}
Myrmecophagous answered 28/6, 2024 at 13:13 Comment(0)
C
-2

// In a Facade:

public Mono<HybrisResponse> getProducts(HybrisRequest request) {
    return Mono.just(HybrisResponse.builder().build());
}

// In a service layer:

@Cacheable(cacheNames = "embarkations")
public HybrisResponse cacheable(HybrisRequest request) {
    LOGGER.info("executing cacheable");
    return null;
}

@CachePut(cacheNames = "embarkations")
public HybrisResponse cachePut(HybrisRequest request) {
    LOGGER.info("executing cachePut");
    return hybrisFacade.getProducts(request).block();
}

// In a Controller:

HybrisResponse hybrisResponse = null;

try {
   // get from cache
   hybrisResponse = productFeederService.cacheable(request);

} catch (Throwable e) {
   // if not in cache then cache it
   hybrisResponse = productFeederService.cachePut(request);
}

return Mono.just(hybrisResponse)
    .map(result -> ResponseBody.<HybrisResponse>builder()
        .payload(result).build())
    .map(ResponseEntity::ok);
Creamcolored answered 15/8, 2019 at 20:18 Comment(1)
Hybris is reactive now?Brinkema

© 2022 - 2025 — McMap. All rights reserved.