Spring Webflux和@Cacheable - 缓存Mono / Flux类型结果的正确方法

30

我正在学习Spring WebFlux,在编写示例应用程序时,我发现与Reactive类型(Mono/Flux)结合使用Spring Cache存在一些问题。

请考虑以下Kotlin代码片段:

@Repository
interface TaskRepository : ReactiveMongoRepository<Task, String>

@Service
class TaskService(val taskRepository: TaskRepository) {

    @Cacheable("tasks")
    fun get(id: String): Mono<Task> = taskRepository.findById(id)
}

这是缓存返回Mono或Flux的方法调用的有效且安全的方式吗?也许有其他原则来做到这一点吗?

以下代码可与SimpleCacheResolver一起正常工作,但默认情况下由于Mono不可序列化而在Redis上失败。为使它们工作(例如需要使用Kryo序列化器)。

3个回答

45

黑客方式

目前,@Cacheable与Reactor 3并没有完全的集成。然而,您可以通过将.cache()操作符添加到返回的Mono来绕过这个问题。

@Repository
interface TaskRepository : ReactiveMongoRepository<Task, String>

@Service
class TaskService(val taskRepository: TaskRepository) {

    @Cacheable("tasks")
    fun get(id: String): Mono<Task> = taskRepository.findById(id).cache()
}

那个从taskRepository数据返回的hack缓存并共享。反过来,Spring Cacheable将缓存返回的Mono的引用,然后返回该引用。换句话说,这是一个保持缓存的单体缓存:)。

Reactor Addons Way

Reactor 3有一个附加功能,可以流畅地与现代内存缓存(如caffeinejcache等)集成。使用这种技术,您可以轻松缓存您的数据:

@Repository
interface TaskRepository : ReactiveMongoRepository<Task, String>

@Service
class TaskService(val taskRepository: TaskRepository) {

    @Autowire
    CacheManager manager;


    fun get(id: String): Mono<Task> = CacheMono.lookup(reader(), id)
                                               .onCacheMissResume(() -> taskRepository.findById(id))
                                               .andWriteWith(writer());

    fun reader(): CacheMono.MonoCacheReader<String, Task> = key -> Mono.<Signal<Task>>justOrEmpty((Signal) manager.getCache("tasks").get(key).get())
    fun writer(): CacheMono.MonoCacheWriter<String, Task> = (key, value) -> Mono.fromRunnable(() -> manager.getCache("tasks").put(key, value));
} 
注意:Reactor插件会缓存自己的抽象,即Signal<T>,因此不必担心并遵循该约定。

1
感谢宝贵的提示,但问题仍然存在:序列化和缓存Mono对象本身是否存在风险或被认为是不良实践?我想使用@Cacheable与Redis结合使用,将缓存移出应用程序内存。 - Tomek Zaremba
3
将"reactor addons way"集成到@Cacheable中,以便在将来的某个时候缓存Mono保存的结果。 缓存Mono实例本身是没有意义的,就像尝试缓存普通的RunnableFuture一样。 - Simon Baslé
2
@SoulCub 在某个时刻,调用者之间没有额外的同步,因此可能会出现两个对数据库的调用。因此,您需要添加额外的调用多路复用以避免竞争。我将在答案中添加示例。 - Oleh Dokuka
1
你知道@Cacheable.cache()解决方案是否会泄漏内存吗?如果我理解正确,@Ilker建议使用.cache(ttl),其中ttl≥缓存配置的ttl。你知道这是否必要吗? - Tobia
1
Hack的方式存在严重缺陷。如果Mono中出现错误,在这种情况下,exception会在taskRepository.findById(id)内部抛出,并且会被缓存和发出,直到缓存失效。 - Piotr
显示剩余8条评论

3
我使用了Oleh Dokuka的hacky解决方案,效果很好,但有一个要点。在Flux缓存中,您必须使用比可缓存缓存的timetolive值更长的Duration。如果您不使用Flux缓存的持续时间,它将无法使其失效(Flux文档中说:“将此Flux转换为热源,并为进一步的订阅者缓存最后发出的信号。”)。因此,将Flux缓存设置为2分钟和timetolive设置为30秒可能是有效的配置。如果ehcache超时先发生,则生成新的Flux缓存引用并使用它。

4
你是在说如果我使用@Cacheable.cache()会导致内存泄漏吗?我需要显式地调用.cache(ttl)并设置ttl ≥ 缓存配置的ttl吗? - Tobia

-2

// 在门面模式中:

public Mono<HybrisResponse> getProducts(HybrisRequest request) {
    return Mono.just(HybrisResponse.builder().build());
}

// 在服务层中:

@Cacheable(cacheNames = "embarkations")
public HybrisResponse cacheable(HybrisRequest request) {
    LOGGER.info("executing cacheable");
    return null;
}

@CachePut(cacheNames = "embarkations")
public HybrisResponse cachePut(HybrisRequest request) {
    LOGGER.info("executing cachePut");
    return hybrisFacade.getProducts(request).block();
}

// 在控制器中:

HybrisResponse hybrisResponse = null;

try {
   // get from cache
   hybrisResponse = productFeederService.cacheable(request);

} catch (Throwable e) {
   // if not in cache then cache it
   hybrisResponse = productFeederService.cachePut(request);
}

return Mono.just(hybrisResponse)
    .map(result -> ResponseBody.<HybrisResponse>builder()
        .payload(result).build())
    .map(ResponseEntity::ok);

1
Hybris现在是响应式的吗? - xuesheng

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接