我有一个场景,需要维护一个 Map,可以由多个线程填充。每个线程都会修改其各自的 List(唯一标识符/键是线程名称)。当线程的列表大小超过固定批量大小时,我们必须将记录持久化到数据库中。
以下是示例代码:
以下是示例代码:
private volatile ConcurrentHashMap<String, List<T>> instrumentMap = new ConcurrentHashMap<String, List<T>>();
private ReadWriteLock lock ;
public void addAll(List<T> entityList, String threadName) {
try {
lock.readLock().lock();
List<T> instrumentList = instrumentMap.get(threadName);
if(instrumentList == null) {
instrumentList = new ArrayList<T>(batchSize);
instrumentMap.put(threadName, instrumentList);
}
if(instrumentList.size() >= batchSize -1){
instrumentList.addAll(entityList);
recordSaver.persist(instrumentList);
instrumentList.clear();
} else {
instrumentList.addAll(entityList);
}
} finally {
lock.readLock().unlock();
}
}
每隔两分钟会有一个独立的线程运行,将Map中的所有记录持久化(以确保每两分钟后都有一些东西被持久化,并且Map的大小不会变得太大),并且当它启动时会阻止所有其他线程(检查readLock和writeLock,在writeLock具有更高的优先级)。
if(//Some condition) {
Thread.sleep(//2 minutes);
aggregator.getLock().writeLock().lock();
List<T> instrumentList = instrumentMap .values().stream().flatMap(x->x.stream()).collect(Collectors.toList());
if(instrumentList.size() > 0) {
saver.persist(instrumentList);
instrumentMap .values().parallelStream().forEach(x -> x.clear());
aggregator.getLock().writeLock().unlock();
}
这个解决方案在我们测试的几乎所有场景中都运行得很好,但有时我们会发现一些记录丢失了,即使它们在Map中被正确添加,也没有持久化。
我的问题是,这段代码有什么问题? ConcurrentHashMap 不是最好的解决方案吗? 读写锁的使用在这里有问题吗? 我应该采用顺序处理吗?