What is the preferred way how to add Caffeine cache to kotlin with coroutines
Asked Answered
D

3

5

I am trying to integrate Caffeine cache into kotlin+spring boot application, however, I am getting the problem of calling the suspension function in the non-coroutine body. I get this, but I am looking for a solution that should be a bit more standard. I can find only one solution on the web that leads to SO, where I do not really see a stable way how to fix this.

inMemoryCache.get(id) { id ->
   some call to external service <--- "Suspension function can be called only within coroutine body"
}
Decennary answered 29/3, 2022 at 17:9 Comment(10)
I am not a kotlin developer, but I think using an AsyncCache with conversions between Deferred and CompletableFuture is the most correct translation.Regelation
@BenManes I wish I could understand how to do that. I can get CompletableFuture but I am not sure if know the next step..Decennary
Ok I gave it up... not sure if this is the way the backend development should head. Just to do a basic thing and all the coroutines make it 10x more complicated and the benefit is so tiny.Decennary
There is kotlinx-coroutines-jdk8 which adds Deferred.asCompletableFuture and CompletionStage.asDeferred converters.Regelation
Java's upcoming Virtual Threads will be like Go's, where threads are cheap coroutines and transparent as just normal blocking code. That's a better backend experience in my opinion, but sadly big JVM features take a very long time to deliver.Regelation
@Decennary "all the coroutines make it 10x more complicated and the benefit is so tiny" - then don't use them? I don't try to be rude, I'm serious. Kotlin coroutines are kind of tricky and they need some learning, but in many cases they provide great benefits and make concurrent programming hell a lot easier and less error-prone. If you personally don't benefit from them then just don't use them :-)Brazell
I'd love to have some long discussion about this topic, however probably not a good platform :) From my point of view, Kotlin and coroutines are great for Android, where you can really benefit from different things happening at the same time (main thread, UI thread, networking, etc.). However in microservices, you do not usually have to cope with a lot of threads, you have in 99% of cases CRUD operations. Even Go made it super simple...Decennary
In JVM... they try to bring reactive to the backend world and the developer cannot use the tools he was using last 10 years... because it is just a different approach. What is the benefit? Memory? What about solving the thing JVM in Kubernetes takes 1GB memory just to not crash because of OOM, so they are integrating a reactive approach by scratching the head with the opposite hand.Decennary
@Decennary That is why the JVM is moving towards a Go-style approach. Threads will no longer be 1:1 with kernel threads, but be implemented as coroutines internally. That keeps the programming model you prefer with the memory and latency benefits that user-mode threads provide. This is still 12-24 months out, sadly. For the reasons you give the reactive style isn't the default for backend developers and there is an inclination to wait rather than rewrite if the complexity is unnecessarily.Regelation
@BenManes Thank you very much for this comment. The thing is I did not choose this path on my current project, I am still trying to solve simple things which were working just in 10 seconds in non-reactive Java+spring, but this ... this is just a step back. In order to have this working, one needs to add dependency hell to the project (coroutines, jetbrains libraries, reactor etc.)... this must end, Java/Kotlin needs to make it more enjoyable for developers. Looking forward for any resemblance to Go simplicity.Decennary
K
5

Observe (a) Caffeine AsyncCache::get signature:

public interface AsyncCache<K, V> {
  CompletableFuture<V> get(K key,
    BiFunction<? super K, Executor, CompletableFuture<V>> mappingFunction);
}

and (b) Kotlin coroutines signatures:

public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R

public fun <T> CoroutineScope.future(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
) : CompletableFuture<T>

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> //Java

public suspend fun <T> CompletionStage<T>.await(): T

Suppose you have a suspending mapping function myCreate. You can use CoroutineScope.future() to convert it to a CompletableFuture, pass the future into AsyncCache::get, and call await() to make it suspending, such that you can leverage structured concurrency.

An example:

import com.github.benmanes.caffeine.cache.AsyncCache
import com.github.benmanes.caffeine.cache.Caffeine
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.future.await
import kotlinx.coroutines.future.future
import java.util.concurrent.TimeUnit
import javax.inject.Named

@Named
class CacheStore
{
    class Entry (val value: Double)

    val cache: AsyncCache<String, Entry> = Caffeine.newBuilder()
        .maximumSize(10_000)
        .expireAfterWrite(1, TimeUnit.HOURS)
        .buildAsync()

    suspend fun get( key: String, 
                     create: suspend CoroutineScope.() -> Entry
    ): Entry = coroutineScope {
        val fut = cache.get(key) { _, _ -> future { create() } }
        fut.await()
    }
}

...

suspend fun invoke(): CacheStore.Entry {
    val entry = cacheStore.get(key) {
        // logging or other logic
        myCreate(arg)
    }
    return entry
}

suspend fun myCreate(arg: Double): CacheStore.Entry {
    ...
}

Ref: the official KEEP coroutines proposal on how to convert between callbacks, futures, and suspending functions:

https://github.com/Kotlin/KEEP/blob/master/proposals/coroutines.md#asynchronous-programming-styles

Killam answered 17/11, 2022 at 5:7 Comment(0)
B
3

You cannot use a suspendable function inside the Cache loading function, because those functions are not coroutines.

You have several options.

  1. If you don't mind "wasting" a thread, and you are using a Cache or LoadingCache you can use runBlocking.
inMemoryCache.get(id) { id ->
   runBlocking {
     some call to external service
   }
}
  1. You can convert the external call to a Future if you are using an AsyncCache or AsyncLoadingCache. Note you must create a CoroutineScope in order to call async.
inMemoryCache.get(id) { id, _ ->
  scope.async { compute(k) }.asCompletableFuture().await()
}
  1. Use a Kotlin wrapper for Caffeine.
Borne answered 25/9, 2022 at 3:10 Comment(2)
(1) I don't think Kotlin plays nicely with using runBlocking within a ConcurrentHashMap compute-style call. See Netflix's issue which switched to option (2) (1, 2).Regelation
Having read both of those links, it seems they were just doing recursive calls to the cache, which has nothing to do with runBlocking itself. runBlocking is just a way of providing a coroutineScope from a regular java Thread, so suspendable functions have a scope in which to execute. If you were to do any other blocking call inside your cache callback, it would be the exact same thing.Borne
S
0

I have written this library. It can solve this problem, so please use it if you like. https://github.com/be-hase/caffeine-coroutines

Spay answered 20/5 at 6:23 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.