使用反应式Lettuce将Redis命令进行管道化

22

我正在使用 spring boot webflux + project reactor + lettuce 以非阻塞方式连接和查询Redis。

我已经使用 LettuceConnectionFactory 配置了 ReactiveRedisTemplate。Spring文档说明,使用流水线的唯一方法是使用 execute(<RedisCallback>) 方法。在非反应式的 RedisTemplate 中,我看到有一个 executePipelined(<RedisCallback>) 方法,在执行回调之前打开/关闭管道。但是,在 ReactiveRedisTemplate.execute 方法的情况下,它使用 LettuceReactiveRedisConnection,而不是 Spring ReactiveRedisConnectionLettuce 没有参考流水线。

因此,我的问题是,在使用 Spring ReactiveRedisTemplate + ReactiveLettuceConnection 时是否可能对命令进行流水线处理?

我还注意到,使用具有多个Redis命令的 RedisCallbackReactiveRedisTemplate.execute 比单独调用命令执行得更慢。

使用 ReactiveRedisTemplate 进行流水线处理的示例代码:

reactiveRedisTemplate.execute(connection -> keys.flatMap(key -> 
                                connection.hashCommands()
                                .hGetAll(ByteBuffer.wrap(key.getBytes()))))
                    .map(Map.Entry::getValue)
                    .map(ByteUtils::getBytes)
                    .map(b -> {
                        try {
                        return mapper.readValue(b, Value.class);
                        } catch (IOException e1) {
                        return null;
                        }
                    })
                    .collectList();

没有流水线的代码:

keys.flatMap(key -> reactiveRedisTemplate.opsForHash().entries(key))
            .map(Map.Entry::getValue)
            .cast(Value.class)
            .collectList();

谢谢!


1
我遇到了同样的问题,已经在谷歌上搜索了一周,但没有结果,你有什么解决方案吗? - aswzen
@kriver。你找到你的问题的答案了吗?请更新回答。 - Zakir saifi
1个回答

0

我认为使用RedisReactiveTemplate或lettuce的响应式api不可能实现。当您构建响应式链时,有些部分会被惰性地评估。

getAsyncValue(a).flatMap(value -> doSomething(value)).subscribe()

例如,在这个示例中,只有在 getAsyncValue 返回一个值时,doSomething 才会被触发。

现在,如果我们采用您的 RedisCallback 示例,并假设我们在连接对象中有一个 flushAll 方法。您在哪里/何时调用它?

tpl.execute(connection -> {
                    Flux<Map.Entry<ByteBuffer, ByteBuffer>> results = keys.flatMap(key ->
                            connection.hashCommands()
                                    .hGetAll(ByteBuffer.wrap(key.getBytes())));
                    connection.fluxAll();
                    return results;
                })

像这样,因为没有触发hashCommands,所以命令不会被刷新到服务器。

现在让我们看看所有的信号回调:

  • doOnNext
  • doOnError
  • doOnCancel
  • doFirst
  • doOnSubscribe
  • doOnRequest
  • doOnTerminate
  • doAfterTerminate
  • doFinally
  • doOnComplete

doOnError或doOnCancel对我们没有帮助。但是我们可以考虑使用doFinally、doOnTerminate、doAfterTerminate:

tpl.execute(connection -> keys.flatMap(key -> connection.hashCommands()
                                        .hGetAll(ByteBuffer.wrap(key.getBytes())))
                                        .doFinally(s -> connection.flushAll()))

但是 htGetAll 不会完成,直到命令被刷新到服务器,因此 doFinally 不会被调用,所以我们将无法刷新....

我能想到的唯一解决方法是直接使用 lettuce 的异步 API。文档中有一个示例,说明如何操作。

您的代码可能如下(未经测试):

// client is a RedisClient from lettuce

StatefulRedisConnection<String, String> connection = client.connect();
RedisAsyncCommands<String, String> command = connection.async();
command.setAutoFlushCommands(false);
keys.map(command::hgetall)
                .collectList()
                .doOnNext(f -> command.flushCommands())
                .flatMapMany(f -> Flux.fromIterable(f).flatMap(Mono::fromCompletionStage))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接