Guava Cache -- Java 应用缓存神器
导语:
Guava 作为Google开源Java 库中的精品成员,在性能、功能上都十分出色,本文将从实际使用的角度,来对Guava进行讲解
在使用Cache之前,首先要考虑两个问题:
1、应该在什么情况下使用它?
2、如何正确的使用它?
作者本人有很大一部分代码都在逻辑层(CGI),这一层的工作大多包括大量的业务逻辑、数据拼接、配置读取。以做一个活动为例子 腾讯课堂Linux运维学院:
左边是一个最近直播列表,CGI 做的工作是要拉取类目下的直播课程以及课程相关信息,并根据时间进行排序,右边的状态是根据访问者的报名情况显示不同的状态。大量的用户访问会导致课程资料读服务压力非常大,但是左侧列表其实每一个用户看到的都是一样的,因此这一部分数据就是使用缓存的典型场景。右边根据每个用户会看到不同数据需要对缓存做特别处理,这里在后面文章会讲解到。
了解到了正确的应用场景,接下来就是了解正确的使用姿势了。
一名合格的缓存,应该具备以下基本素质:
1、能够配置缓存的大小,保持可控的Memory FoodPrint。
2、适应多种场景的数据expire策略。
3、在高并发情况下、能够正常缓存的更新以及返回。
带着这几个问题,来开始介绍Guava Cache这一趁手兵器。
Cache<Key, Value> cache = CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterWrite(10, TimeUnit.MINUTES)
.build();
try {
cache.get(key, new Callable<Value>() {
@Override
public Value call() throws AnyException {
return doThingsTheHardWay(key);
}
});
} catch (ExecutionException e) {
throw new Exception(e.getCause());
}
上面的代码包括了一个缓存的初始化以及获取数据的操作 。已经大致引入了前面提出的问题。
1、控制缓存的大小:
maximumSize 设定了该缓存的最大存储单位(key)个数。
maximumWeight 是根据设定缓存数据的最大值。 这两个参数可以有效的控制Cache占用内存大小,使用时根据实际情况设定合适的值。
2、缓存更新的策略:
- 根据时间
expireAfterWrite 缓存写入后多久过期。
expireAfterAccess 缓存读取后多久过期。
refreshAfterWrite 缓存写入后多久更新。
- 根据大小
maximumSize
maximumWeight
- 根据引用
weakKeys
weakValues
softValues
根据引用类型策略作者本人使用很少,因为Java虚拟机垃圾回收的时机是不可主动控制的,因此主要使用时间、大小这两种策略。WeakReference 、 SoftReference
3、并发情况下,保证数据的正确更新。
lock();//加锁操作
try {
// re-read ticker once inside the lock
long now = map.ticker.read();
preWriteCleanup(now);
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);//从存储数据的Segament中拿到Entry实例
for (e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash && entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {//判断实例是否存在
valueReference = e.getValueReference();
if (valueReference.isLoading()) {//如何已有数据正在更新,则不再创建实例
createNewEntry = false;
} else {
V value = valueReference.get();
if (value == null) {
enqueueNotification(entryKey, hash, valueReference, RemovalCause.COLLECTED);
} else if (map.isExpired(e, now)) {//若实例已经过期,则标记Expire
// This is a duplicate check, as preWriteCleanup already purged expired
// entries, but let's accomodate an incorrect expiration queue.
enqueueNotification(entryKey, hash, valueReference, RemovalCause.EXPIRED);
} else {
recordLockedRead(e, now);
statsCounter.recordHits(1);
// we were concurrent with loading; don't consider refresh
return value;//返回缓存的数据
}
// immediately reuse invalid entries
writeQueue.remove(e);
accessQueue.remove(e);
this.count = newCount; // write-volatile
}
break;
}
}
if (createNewEntry) {//实例不存在的情况下,创建Loader去拉取
loadingValueReference = new LoadingValueReference<K, V>();
if (e == null) {
e = newEntry(key, hash, first);
e.setValueReference(loadingValueReference);
table.set(index, e);
} else {
e.setValueReference(loadingValueReference);
}
}
} finally {
unlock();//释放锁
postWriteCleanup();
}
当多个请求同时请求数据时且数据过期时,才会上段代码的运行,进入先加锁,保证只有一个线程处理更新操作,更新完成后释放。
到这里,使用场景以及Guava Cache 的使用方法已经了解完了,还需要一个工具来验证缓存的效果。
Guava 提供了recordStats()方法,相当于启动了记录模式,通过Cache.stats()方法可以获取CacheStats对象,里面存储着缓存的使用情况,通过观察它就可以知道缓存的命中率,加载耗时等信息,有了这些数据的反馈就可以调整的缓存的大小以及其他的优化工作了。
==========================================================
下面一起聊下使用缓存时会遇到的问题,以及Guava Cache个人认为好的使用姿势。
缓存使用的最常见的问题,上文中,提到缓存数据拉取出来后,需要添加一些关于每一个访问用户的额外信息,例如拉取出上课列表后,每一个用户针对课程的状态是不一样的(报名状态),通常会犯的一个错误就是直接在缓存数据基础上进行修改,通常我们缓存的对象会是一个Map,或者List,对其引用的修改其实已经修改了对应值本身,这样会造成数据的混乱。因此记得在修改之前将缓存数据先深拷贝。
在上述Guava Cache的使用中,当缓存过期后,此时请求过来就会阻塞等待缓存的重新拉取。。。有没有可能避免掉这种阻塞?例如先把旧的数据返回,去异步更新数据,数据成功更新完成后,再将旧的数据做替换呢?答案是肯定的,Guava Cache提供了Refresh机制。
LoadingCache<Key, Graph> cache = CacheBuilder.newBuilder()
.maximumSize(1000)
.refreshAfterWrite(1, TimeUnit.MINUTES)
.build(
new CacheLoader<Key, Graph>() {
public Graph load(Key key) { // no checked exception
return getGraphFromDatabase(key);
}
public ListenableFuture<Graph> reload(final Key key, Graph prevGraph) {
if (neverNeedsRefresh(key)) {
return Futures.immediateFuture(prevGraph);
} else {
// asynchronous!
ListenableFutureTask<Graph> task = ListenableFutureTask.create(new Callable<Graph>() {
public Graph call() {
return getGraphFromDatabase(key);
}
});
executor.execute(task);
return task;
}
}
});
cache.get(key);
大家可以看到,CacheBuilder的一个新方法 refreshAfterWrite , 并且build方法中多了一个CacheLoader的实例,返回也成了LoadingCache类型,build() 不带Cacheloader返回的是Cache类型 ,LoadingCache 继承了 Cache 接口 新增了 get(),refresh()方法,有兴趣的同学可以查看源码了解类的更详细的结构。
下面代码可以了解到refresh方法被调用的时机(触发时间会导致一个缺陷,后文会提到):
V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
checkNotNull(key);
checkNotNull(loader);
try {
if (count != 0) { // read-volatile
ReferenceEntry<K, V> e = getEntry(key, hash);
if (e != null) {
long now = map.ticker.read();
V value = getLiveValue(e, now);
if (value != null) {
recordRead(e, now);
statsCounter.recordHits(1);
return scheduleRefresh(e, key, hash, value, now, loader);
//这里触发refresh,相当于只有在get时才会被触发
//请注意这里将value也传入其中,也就是old值
}
ValueReference<K, V> valueReference = e.getValueReference();
if (valueReference.isLoading()) {
return waitForLoadingValue(e, key, valueReference);
}
}
}
// at this point e is either null or expired;
return lockedGetOrLoad(key, hash, loader);
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if (cause instanceof Error) {
throw new ExecutionError((Error) cause);
} else if (cause instanceof RuntimeException) {
throw new UncheckedExecutionException(cause);
}
throw ee;
} finally {
postReadCleanup();
}
}
这里是 scheduleRefresh 方法具体实现:
V scheduleRefresh(ReferenceEntry<K, V> entry, K key, int hash, V oldValue, long now,
CacheLoader<? super K, V> loader) {
if (map.refreshes() && (now - entry.getWriteTime() > map.refreshNanos)//是否需要更新,配置了refreshAfterWrite 且 时间条件达到
&& !entry.getValueReference().isLoading()) {
V newValue = refresh(key, hash, loader, true);//调用更新方法
if (newValue != null) {
return newValue;
}
}
return oldValue; //如果尚未更新完成,则直接返回oldValue
}
从这外层代码,大家基本上明白refresh的好处。如果想要继续深入了解是如何做到异步去更新的,请继续看下面代码,不感兴趣的可以跳过,毕竟一下子代码看多了产生不适。
@Nullable
V refresh(K key, int hash, CacheLoader<? super K, V> loader, boolean checkTime) {
final LoadingValueReference<K, V> loadingValueReference =
insertLoadingValueReference(key, hash, checkTime);
if (loadingValueReference == null) {
return null;
}
//异步的去拉取更新,result.isDone是非阻塞的。
ListenableFuture<V> result = loadAsync(key, hash, loadingValueReference, loader);
if (result.isDone()) {
try {
return Uninterruptibles.getUninterruptibly(result);
} catch (Throwable t) {
// don't let refresh exceptions propagate; error was already logged
}
}
return null;
}
// 异步调用loadAsync
ListenableFuture<V> loadAsync(final K key, final int hash,
final LoadingValueReference<K, V> loadingValueReference, CacheLoader<? super K, V> loader) {
final ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
//这里的Listener是当更新拉取成功后运行的,他的作用就是将新值替换老值,并且记录。
loadingFuture.addListener(
new Runnable() {
@Override
public void run() {
try {
V newValue = getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
// update loadingFuture for the sake of other pending requests
loadingValueReference.set(newValue);
} catch (Throwable t) {
logger.log(Level.WARNING, "Exception thrown during refresh", t);
loadingValueReference.setException(t);
}
}
}, sameThreadExecutor);
return loadingFuture;
}
//查看LoadFuture方法
//这里就是真正去拉取更新的核心代码
public ListenableFuture<V> loadFuture(K key, CacheLoader<? super K, V> loader) {
stopwatch.start();
V previousValue = oldValue.get();
try {
if (previousValue == null) {//如果值不存在,当整个类实例第一次实例化时,
//值是不存在的,因此这里会调用CacheLoader中的load方法。
V newValue = loader.load(key);
return set(newValue) ? futureValue : Futures.immediateFuture(newValue);
} else {
//当oldValue存在,这会调用Reload方法。reload方法的默认实现其实就是调用load方法
//load方法是同步实现的,因此如果仅仅使用了refreshAfterWrite这一机制,并不能实现异步加载
//所以代码例子中CacheLoader的reload方法里是将load作为一个task放在一个executors运行的。
ListenableFuture<V> newValue = loader.reload(key, previousValue);
// rely on loadAsync to call set in order to avoid adding a second listener here
return newValue != null ? newValue : Futures.<V>immediateFuture(null);
}
} catch (Throwable t) {
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
return setException(t) ? futureValue : fullyFailedFuture(t);
}
}
Guava Cache 这种异步刷新缓存的方式是作者个人比较倾向的使用姿势,但是这里也有一个问题,缓存不会主动刷新的,刷新操作的触发需要满足两个条件:第一,缓存过期了,也就是refreshAfterWrite(时间参数)这里的时间达到条件,第二,有get()请求。那么问题来了,如果使用者想要严格的定期刷新如何做?这个问题建议创建一个定时刷新的执行器,定期执行refresh(key)方法,但是前提是你得知道缓存中所有的 key (这里要得到所有的key,有很多暴力的方法,既然是暴力的,就不多介绍了)。