Pipeline Redis commands with Reactive Lettuce
Asked Answered
M

1

22

I'm using spring boot webflux + project reactor + lettuce for connecting and querying Redis in a non blocking way. I have configured the ReactiveRedisTemplate with LettuceConnectionFactory. The spring documentation states that the only way to use pipeline with ReactiveRedisTemplate is to use the execute(<RedisCallback>) method. In non-reactive RedisTemplate, I see that there is an executePipelined(<RedisCallback>) method which opens/closes a pipeline before executing the callback. But in case of ReactiveRedisTemplate.execute method, it uses a LettuceReactiveRedisConnection and neither Spring ReactiveRedisConnection nor Lettuce has no reference to pipeline.

So my question is, is it possible to pipleline your commands when using Spring ReactiveRedisTemplate + ReactiveLettuceConnection?

I also noticed that using ReactiveRedisTemplate.execute with a RedisCallback that has multiple Redis commands executes slower than just calling commands individually.

Sample code for pipleline with ReactiveRedisTemplate:

reactiveRedisTemplate.execute(connection -> keys.flatMap(key -> 
                                connection.hashCommands()
                                .hGetAll(ByteBuffer.wrap(key.getBytes()))))
                    .map(Map.Entry::getValue)
                    .map(ByteUtils::getBytes)
                    .map(b -> {
                        try {
                        return mapper.readValue(b, Value.class);
                        } catch (IOException e1) {
                        return null;
                        }
                    })
                    .collectList();

Code without pipeline:

keys.flatMap(key -> reactiveRedisTemplate.opsForHash().entries(key))
            .map(Map.Entry::getValue)
            .cast(Value.class)
            .collectList();

Thanks!

Merchantable answered 15/8, 2018 at 21:4 Comment(2)
i got the same problem, already googling around for 1 week have no results, do you have any solution?Pilgrim
@kriver. Did you find the answer to your question?. plz update with answerPardner
C
0

I don't think it's possible using RedisReactiveTemplate or the reactive api of lettuce. Indeed when you construct your reactive chain some part of it will be lazily evaluated.

getAsyncValue(a).flatMap(value -> doSomething(value)).subscribe()

For instance in this sample doSomething will be trigger only when getAsyncValue return a value.

Now if we take your RedisCallback sample and assume we have a flushAll method in the connection object. Where/when do you call it ?

tpl.execute(connection -> {
                    Flux<Map.Entry<ByteBuffer, ByteBuffer>> results = keys.flatMap(key ->
                            connection.hashCommands()
                                    .hGetAll(ByteBuffer.wrap(key.getBytes())));
                    connection.fluxAll();
                    return results;
                })

Like this nothe commands will be flushed to the server because no hashCommands would have been triggered.

Now let's look at all the signals callback we have:

  • doOnNext
  • doOnError
  • doOnCancel
  • doFirst
  • doOnSubscribe
  • doOnRequest
  • doOnTerminate
  • doAfterTerminate
  • doFinally
  • doOnComplete

doOnError or doOnCancel won't help us. But we could think about using doFinally, doOnTerminate, doAfterTerminate:

tpl.execute(connection -> keys.flatMap(key -> connection.hashCommands()
                                        .hGetAll(ByteBuffer.wrap(key.getBytes())))
                                        .doFinally(s -> connection.flushAll()))

But htGetAll won't finish until command are flushed to the server so doFinnaly won't be call so we won't be able to flush....

The only workaround I can think of is to use directly async api of lettuce. There is a sample in the documentation on how to do it.

Your code could looks like (not tested):

// client is a RedisClient from lettuce

StatefulRedisConnection<String, String> connection = client.connect();
RedisAsyncCommands<String, String> command = connection.async();
command.setAutoFlushCommands(false);
keys.map(command::hgetall)
                .collectList()
                .doOnNext(f -> command.flushCommands())
                .flatMapMany(f -> Flux.fromIterable(f).flatMap(Mono::fromCompletionStage))
Cess answered 28/10, 2021 at 9:42 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.