以下代码是线程安全的吗?

3
我有一个场景,需要维护一个 Map,可以由多个线程填充。每个线程都会修改其各自的 List(唯一标识符/键是线程名称)。当线程的列表大小超过固定批量大小时,我们必须将记录持久化到数据库中。
以下是示例代码:
private volatile ConcurrentHashMap<String, List<T>>  instrumentMap = new ConcurrentHashMap<String, List<T>>();
private ReadWriteLock lock ;

public void addAll(List<T> entityList, String threadName) {
    try {
        lock.readLock().lock();
        List<T> instrumentList = instrumentMap.get(threadName);
        if(instrumentList == null) {
            instrumentList = new ArrayList<T>(batchSize);
            instrumentMap.put(threadName, instrumentList);
        }

        if(instrumentList.size() >= batchSize -1){
            instrumentList.addAll(entityList);
            recordSaver.persist(instrumentList); 
            instrumentList.clear();
        } else {
            instrumentList.addAll(entityList);  
        }
    } finally {
        lock.readLock().unlock();
    }

}

每隔两分钟会有一个独立的线程运行,将Map中的所有记录持久化(以确保每两分钟后都有一些东西被持久化,并且Map的大小不会变得太大),并且当它启动时会阻止所有其他线程(检查readLock和writeLock,在writeLock具有更高的优先级)。

if(//Some condition) {
                    Thread.sleep(//2 minutes);
                    aggregator.getLock().writeLock().lock();
                    List<T> instrumentList = instrumentMap .values().stream().flatMap(x->x.stream()).collect(Collectors.toList());
                    if(instrumentList.size() > 0) {

                        saver.persist(instrumentList);
                        instrumentMap .values().parallelStream().forEach(x -> x.clear());
                    aggregator.getLock().writeLock().unlock();
                }

这个解决方案在我们测试的几乎所有场景中都运行得很好,但有时我们会发现一些记录丢失了,即使它们在Map中被正确添加,也没有持久化。

我的问题是,这段代码有什么问题? ConcurrentHashMap 不是最好的解决方案吗? 读写锁的使用在这里有问题吗? 我应该采用顺序处理吗?


你在持有读锁的情况下持续写入东西?你觉得也许应该使用写锁才是正确的选择吗? - Andy Turner
如果你正在使用锁和并发哈希映射,那么你通常是在做一些错误的事情。 - Andy Turner
@Andy Turner,当每个线程修改/持久化自己的独立列表时,读锁有什么问题? - Amit
那么你有什么建议呢?我应该只使用写锁吗?我不能在这里使用普通的HashMap。 - Amit
@AndyTurner,你提出的解决方案很好。但是问题还有另一部分,就像我之前提到的,“每隔2分钟运行一个单独的线程”,现在这个线程正在使用writeLock,这显然与原子操作compute(compute内部使用的锁)不同。你认为我应该使用公平性的写锁吗? - Amit
1个回答

6

不,它不是线程安全的。

问题在于您正在使用ReadWriteLock的“读”锁,这不能保证独占式访问以进行更新。您需要使用“写”锁来实现这一点。

但实际上您甚至不需要使用单独的锁。您可以简单地使用ConcurrentHashMap.compute方法:

instrumentMap.compute(threadName, (tn, instrumentList) -> {
  if (instrumentList == null) {
    instrumentList = new ArrayList<>();
  }

  if(instrumentList.size() >= batchSize -1) {
    instrumentList.addAll(entityList); 
    recordSaver.persist(instrumentList); 
    instrumentList.clear();
  } else {
    instrumentList.addAll(entityList);
  }

  return instrumentList;
});

这使您能够更新列表中的项目,同时保证给定键的独占访问权限。

我怀疑您可以将compute调用拆分成computeIfAbsent(如果没有添加列表)以及computeIfPresent(更新/持久化列表):这两个操作的原子性在这里并不必要。但是,拆分它们也没有什么实际意义。


此外,instrumentMap几乎肯定不应该是易失性的。除非您真的想重新分配其值(考虑到此代码,我对此表示怀疑),否则请删除易失性修饰符并将其声明为final。

类似地,非final锁也是可疑的。如果您坚持使用锁,请将其声明为final。


谢谢您的帮助,我会尝试这个。 - Amit
instrumentList 很可能不应该是 volatile 的,你这里是指 instrumentMap 吗? - Amit
@Amit 是的。打错了。 - Andy Turner
如果重新映射函数被调用两次,这段代码会不会引起问题?根据Javadoc:“默认实现可以在多个线程尝试更新(包括潜在地多次调用重新映射函数)时重试这些步骤。 - Finn
@AndyTurner,你提出的解决方案很好。但是问题还有一个部分,就像我之前提到的,“每隔2分钟运行一个单独的线程”,现在这个线程正在使用writeLock,这显然与原子操作compute不同。你认为我应该使用公平性的write lock吗? - Amit
显示剩余4条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接