Spring缓存刷新过期值

21
在一个基于Spring的应用程序中,我有一个服务,它执行某些"Index"的计算。这个"Index"的计算相对比较昂贵(例如,需要1秒钟),但是检查其实际性相对比较便宜(例如,20ms)。实际的代码并不重要,它沿着以下方式进行:
public Index getIndex() {
    return calculateIndex();
}

public Index calculateIndex() {
    // 1 second or more
}

public boolean isIndexActual(Index index) {
    // 20ms or less
}

我正在使用Spring Cache通过@Cacheable注解来缓存计算出的索引:

@Cacheable(cacheNames = CacheConfiguration.INDEX_CACHE_NAME)
public Index getIndex() {
    return calculateIndex();
}

我们目前将 GuavaCache 配置为缓存实现:
@Bean
public Cache indexCache() {
    return new GuavaCache(INDEX_CACHE_NAME, CacheBuilder.newBuilder()
            .expireAfterWrite(indexCacheExpireAfterWriteSeconds, TimeUnit.SECONDS)
            .build());
}

@Bean
public CacheManager indexCacheManager(List<Cache> caches) {
    SimpleCacheManager cacheManager = new SimpleCacheManager();
    cacheManager.setCaches(caches);
    return cacheManager;
}

我还需要检查缓存的值是否仍然有效,如果无效,则最好异步地刷新它。理想情况下,应该按照以下方式进行:

  • 调用getIndex()时,Spring会检查缓存中是否有值。
    • 如果没有,则通过calculateIndex()加载新值并将其存储在缓存中
    • 如果是,则通过isIndexActual(...)检查现有值是否实际。
      • 如果旧值是实际的,则返回旧值。
      • 如果旧值不是实际的,则返回旧值,但从缓存中删除,并触发加载新值。

基本上,我想尽快从缓存中提供值(即使它已过时),同时立即触发更新。

目前为止,我已经做到了检查实际性和清除过期值:

@Cacheable(cacheNames = INDEX_CACHE_NAME)
@CacheEvict(cacheNames = INDEX_CACHE_NAME, condition = "target.isObsolete(#result)")
public Index getIndex() {
    return calculateIndex();
}

如果结果已经过时,此操作会触发逐出并立即返回旧的值,即使情况确实如此。但是,这不会在缓存中刷新值。

是否有一种方法可以配置Spring Cache,在逐出后积极刷新过时的值?

更新

这是一个MCVE示例。

public static class Index {

    private final long timestamp;

    public Index(long timestamp) {
        this.timestamp = timestamp;
    }

    public long getTimestamp() {
        return timestamp;
    }
}

public interface IndexCalculator {
    public Index calculateIndex();

    public long getCurrentTimestamp();
}

@Service
public static class IndexService {
    @Autowired
    private IndexCalculator indexCalculator;

    @Cacheable(cacheNames = "index")
    @CacheEvict(cacheNames = "index", condition = "target.isObsolete(#result)")
    public Index getIndex() {
        return indexCalculator.calculateIndex();
    }

    public boolean isObsolete(Index index) {
        long indexTimestamp = index.getTimestamp();
        long currentTimestamp = indexCalculator.getCurrentTimestamp();
        if (index == null || indexTimestamp < currentTimestamp) {
            return true;
        } else {
            return false;
        }
    }
}

现在开始测试:

@Test
public void test() {
    final Index index100 = new Index(100);
    final Index index200 = new Index(200);

    when(indexCalculator.calculateIndex()).thenReturn(index100);
    when(indexCalculator.getCurrentTimestamp()).thenReturn(100L);
    assertThat(indexService.getIndex()).isSameAs(index100);
    verify(indexCalculator).calculateIndex();
    verify(indexCalculator).getCurrentTimestamp();

    when(indexCalculator.getCurrentTimestamp()).thenReturn(200L);
    when(indexCalculator.calculateIndex()).thenReturn(index200);
    assertThat(indexService.getIndex()).isSameAs(index100);
    verify(indexCalculator, times(2)).getCurrentTimestamp();
    // I'd like to see indexCalculator.calculateIndex() called after
    // indexService.getIndex() returns the old value but it does not happen
    // verify(indexCalculator, times(2)).calculateIndex();


    assertThat(indexService.getIndex()).isSameAs(index200);
    // Instead, indexCalculator.calculateIndex() os called on
    // the next call to indexService.getIndex()
    // I'd like to have it earlier
    verify(indexCalculator, times(2)).calculateIndex();
    verify(indexCalculator, times(3)).getCurrentTimestamp();
    verifyNoMoreInteractions(indexCalculator);
}

我希望在缓存中的值被驱逐后不久就能得到更新。目前情况是在下一次调用 getIndex() 之前刷新。如果驱逐后立即刷新,这将节省我1秒钟。

我尝试过使用 @CachePut,但它也不能带来我想要的效果。值会被刷新,但方法总是会执行,无论 conditionunless 是什么。

目前我唯一看到的方法是调用 getIndex() 两次(第二次是异步/非阻塞)。但那有点傻。


getIndex() 上添加一个额外的注解 @CachePut(cacheNames = INDEX_CACHE_NAME, condition = "target.isObsolete(#result)"),这应该能为您解决问题。 - Bond - Java Bond
@Bond-JavaBond 刚刚测试过了 - 不完全正确。@CachePut 无论如何都会执行该方法,只是不会缓存过时的结果。我想在且仅在结果过时时执行该方法。 - lexicore
我认为使用 @Cacheable 注解无法实现这一点,我一直在寻找这个功能,但从未找到解决方案。你想要的是所谓的自填充缓存,即缓存会自动刷新,但如果刷新仍在进行中,则会返回过时的值。 - john16384
@Bond-JavaBond 不,这不是我在测试中得到的结果。我发现该方法总是被调用,无论结果是否过时。我会尝试为此准备一个MCVE。 - lexicore
@Einstein_AB 抱歉,那是很久以前的事了,我不太记得结果。 - lexicore
显示剩余4条评论
5个回答

7
我认为最简单的做法是创建一个自定义切面,它会透明地完成所有操作,并且可以在更多地方重复使用。
因此,假设您的类路径上有 spring-aop 和 aspectj 依赖项,则以下切面将起到魔术般的作用。
@Aspect
@Component
public class IndexEvictorAspect {

    @Autowired
    private Cache cache;

    @Autowired
    private IndexService indexService;

    private final ReentrantLock lock = new ReentrantLock();

    @AfterReturning(pointcut="hello.IndexService.getIndex()", returning="index")
    public void afterGetIndex(Object index) {
        if(indexService.isObsolete((Index) index) && lock.tryLock()){
            try {
                Index newIndex = indexService.calculateIndex();
                cache.put(SimpleKey.EMPTY, newIndex);
            } finally {
                lock.unlock();
            }
        }
    }
}

需要注意以下几点:

  1. 由于您的getIndex()方法没有参数,因此它将被存储在缓存键SimpleKey.EMPTY中。
  2. 该代码假设IndexService位于hello包中。

1
这个方面的行为与简单的CacheEvict注释相同,对getIndex的调用将一直挂起,直到缓存再次计算。在最好的情况下(单个调用缓存计算),整个线程将被保持,最坏的情况是您永远无法从缓存中获取结果,因为同时调用将强制缓存不断重新计算。 - Ilya Dyoshin
@IlyaDyoshin,反映您需求的情况是@AfterReturning...因此,代码将返回结果并在索引已过期时清除缓存。所以代码保持不变,只需要将@Before更改为@AfterReturning...请查看上面更新的代码。 - Babl
但是在第一次返回和清空缓存之后,系统会“挂起”或启动多个缓存计算(除非有特定的阻塞),直到缓存再次填充。原始请求是提供最近的结果,即使这些结果已经过时,直到缓存使用新值重新初始化。 - Ilya Dyoshin
1
是的,所以你可以不直接清除值,而是计算新值并将其放在旧值的位置。更改缓存值的代码应该使用一些锁定机制,以便只进行单个计算。我会尝试更新我的代码。 - Babl
@Babel,这将是我写过的相同代码。请看EDIT1。 - Ilya Dyoshin
是的,想法和需求是一样的 :) 只是采用了不同的方法 :) - Babl

2

以下类似的代码可以刷新缓存并保持实现简单明了。

编写清晰简洁的代码符合要求完全没有问题。

@Service
public static class IndexService {
    @Autowired
    private IndexCalculator indexCalculator;

    public Index getIndex() {
        Index cachedIndex = getCachedIndex();

        if (isObsolete(cachedIndex)) {
            evictCache();
            asyncRefreshCache();
        }

        return cachedIndex;
    }

    @Cacheable(cacheNames = "index")
    public Index getCachedIndex() {
        return indexCalculator.calculateIndex();
    }

    public void asyncRefreshCache() {
        CompletableFuture.runAsync(this::getCachedIndex);
    }

    @CacheEvict(cacheNames = "index")
    public void evictCache() { }

    public boolean isObsolete(Index index) {
        long indexTimestamp = index.getTimestamp();
        long currentTimestamp = indexCalculator.getCurrentTimestamp();

        if (index == null || indexTimestamp < currentTimestamp) {
            return true;
        } else {
            return false;
        }
    }
}

这种实现的问题在于,在多线程环境下,它会返回大量的null值。因为当你从键中删除值并在新线程中刷新值时,直到计算出新索引之前,缓存中的索引将为空。此外,你可能会出现数百个运行的异步线程,所有这些线程都尝试计算相同的索引。 - Babl

1

编辑1:

基于@Cacheable@CacheEvict的缓存抽象在这种情况下将不起作用。这些行为如下:在@Cacheable调用期间,如果值在缓存中,则从缓存返回值,否则计算并放入缓存,然后返回;在@CacheEvict期间,该值从缓存中删除,因此从此时起,没有值在缓存中,因此第一个进入@Cacheable的调用将强制重新计算并放入缓存。使用@CacheEvict(condition="")仅会检查条件是否根据此条件在此调用期间从缓存中删除值。因此,在每次失效后,@Cacheable方法都将运行这个繁重的例程以填充缓存。

为了使值存储在缓存管理器中并异步更新,我建议重复使用以下例程:

@Inject
@Qualifier("my-configured-caching")
private Cache cache; 
private ReentrantLock lock = new ReentrantLock();

public Index getIndex() {
    synchronized (this) {
        Index storedCache = cache.get("singleKey_Or_AnythingYouWant", Index.class); 
        if (storedCache == null ) {
             this.lock.lock();
             storedCache = indexCalculator.calculateIndex();
             this.cache.put("singleKey_Or_AnythingYouWant",  storedCache);
             this.lock.unlock();
         }
    }
    if (isObsolete(storedCache)) {
         if (!lock.isLocked()) {
              lock.lock();
              this.asyncUpgrade()
         }
    }
    return storedCache;
}

第一个构造是同步的,只是为了阻止所有即将到来的调用等待,直到第一次调用填充缓存。
然后系统检查缓存是否应该重新生成。如果是,则调用单个异步更新值的调用,并且当前线程返回缓存的值。一旦缓存处于重新计算状态,即将到来的调用将简单地从缓存中返回最新的值。以此类推。
使用这样的解决方案,您将能够重用大量内存,例如Hazelcast缓存管理器,以及多个基于键的缓存存储,并保持缓存实际化和驱逐的复杂逻辑。
或者,如果您喜欢@Cacheable注释,可以按以下方式执行:
@Cacheable(cacheNames = "index", sync = true)
public Index getCachedIndex() {
    return new Index();
}

@CachePut(cacheNames = "index")
public Index putIntoCache() {
    return new Index();
}

public Index getIndex() {
    Index latestIndex = getCachedIndex();

    if (isObsolete(latestIndex)) {
        recalculateCache();
    }

    return latestIndex;
}

private ReentrantLock lock = new ReentrantLock();

@Async
public void recalculateCache() {
    if (!lock.isLocked()) {
        lock.lock();
        putIntoCache();
        lock.unlock();
    }
}

这与上面的方法几乎相同,但重复使用了Spring的缓存注释抽象。

原文: 为什么你要通过缓存来解决这个问题?如果这只是一个简单的值(不是基于键的),你可以以更简单的方式组织代码,记住Spring服务默认是单例的。

类似这样:

@Service
public static class IndexService {
    @Autowired
    private IndexCalculator indexCalculator;

    private Index storedCache; 
    private ReentrantLock lock = new ReentrantLock();

    public Index getIndex() {
        if (storedCache == null ) {
             synchronized (this) {
                 this.lock.lock();
                 Index result = indexCalculator.calculateIndex();
                 this.storedCache = result;
                 this.lock.unlock();
             }
        }
        if (isObsolete()) {
             if (!lock.isLocked()) {
                  lock.lock();
                  this.asyncUpgrade()
             }
        }
        return storedCache;
    }

    @Async
    public void asyncUpgrade() {
        Index result = indexCalculator.calculateIndex();
        synchronized (this) {
             this.storedCache = result;
        }
        this.lock.unlock();
    }

    public boolean isObsolete() {
        long currentTimestamp = indexCalculator.getCurrentTimestamp();
        if (storedCache == null || storedCache.getTimestamp() < currentTimestamp) {
            return true;
        } else {
            return false;
        }
    }
}

即,第一次调用是同步的,你必须等待结果填充。然后,如果存储的值已过时,系统将执行异步更新该值,但当前线程将接收存储的“缓存”值。

我还引入了可重入锁来限制同时对存储索引的单个升级。


我不确定我会说你发布的代码是一个“更简单的方式”。至少与两个注释和一个缓存配置相比不是这样。 而且,我真的很想要一个基于缓存的解决方案。在这种情况/示例中,我没有键,但在其他情况下有。我对一般解决方案感兴趣。 - lexicore
好的,是的,它看起来并不简单,但它确实做到了您所请求的:返回最新的缓存值,而不考虑其过时性,并在其值过时时触发缓存升级。这些注释是简单的抽象,以支持类似于记忆化调用的操作:在调用@Cacheable时,如果有缓存值,则返回缓存值;如果没有,则计算、放入缓存并返回。在调用CacheEvict注释时,从缓存中删除存储的值。这些缓存操作不应该基于缓存中的值对实体进行操作。 - Ilya Dyoshin
如果缓存是基于时间驱逐的,那么我建议您使用基于调度的例程,通过@CachePut方法将其计算结果放入缓存中。如果您想将缓存存储在某个缓存解决方案中,则可以注入预配置的Cache(请参见编辑)。 - Ilya Dyoshin

0
我建议在您的索引服务中使用Guava LoadingCache,就像下面代码示例中所示:

LoadingCache<Key, Graph> graphs = CacheBuilder.newBuilder()
 .maximumSize(1000)
 .refreshAfterWrite(1, TimeUnit.MINUTES)
 .build(
     new CacheLoader<Key, Graph>() {
       public Graph load(Key key) { // no checked exception
         return getGraphFromDatabase(key);
       }
       public ListenableFuture<Graph> reload(final Key key, Graph prevGraph) {
         if (neverNeedsRefresh(key)) {
           return Futures.immediateFuture(prevGraph);
         } else {
           // asynchronous!
           ListenableFutureTask<Graph> task = ListenableFutureTask.create(new Callable<Graph>() {
             public Graph call() {
               return getGraphFromDatabase(key);
             }
           });
           executor.execute(task);
           return task;
         }
       }
     });

您可以通过调用Guava的方法来创建一个异步重新加载缓存加载器:

public abstract class CacheLoader<K, V> {
...

  public static <K, V> CacheLoader<K, V> asyncReloading(
      final CacheLoader<K, V> loader, final Executor executor) {
      ...
      
  }
}

关键是在单独的线程中运行重新加载操作,例如使用ThreadPoolExecutor:

  • 在第一次调用时,缓存由load()方法填充,因此可能需要一些时间才能回答。
  • 在后续调用中,当需要刷新值时,它会异步计算,同时仍然提供旧值。一旦刷新完成,它将提供更新后的值。

0

我认为它可以是这样的

@Autowired
IndexService indexService; // self injection

@Cacheable(cacheNames = INDEX_CACHE_NAME)
@CacheEvict(cacheNames = INDEX_CACHE_NAME, condition = "target.isObsolete(#result) && @indexService.calculateIndexAsync()")
public Index getIndex() {
    return calculateIndex();
}

public boolean calculateIndexAsync() {
    someAsyncService.run(new Runable() {
        public void run() {
            indexService.updateIndex(); // require self reference to use Spring caching proxy
        }
    });
    return true;
}

@CachePut(cacheNames = INDEX_CACHE_NAME)
public Index updateIndex() {
    return calculateIndex();
}

上述代码存在一个问题,如果在更新过程中再次调用getIndex(),它将被重新计算。为了防止这种情况发生,最好不要使用@CacheEvict,让@Cacheable返回过时的值,直到索引计算完成。
@Autowired
IndexService indexService; // self injection

@Cacheable(cacheNames = INDEX_CACHE_NAME, condition = "!(target.isObsolete(#result) && @indexService.calculateIndexAsync())")
public Index getIndex() {
    return calculateIndex();
}

public boolean calculateIndexAsync() {
    if (!someThreadSafeService.isIndexBeingUpdated()) {
        someAsyncService.run(new Runable() {
            public void run() {
                indexService.updateIndex(); // require self reference to use Spring caching proxy
            }
        });
    }
    return false;
}

@CachePut(cacheNames = INDEX_CACHE_NAME)
public Index updateIndex() {
    return calculateIndex();
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接