Java Map 并发更新

12

我想创建一个包含int值的Map,并通过多个线程对它们进行增加。可能会有两个或更多个线程同时增加相同的键。

ConcurrentHashMap 的文档对我来说非常不清楚,因为它说:

检索操作(包括get)通常不会被阻塞,因此可能与更新操作(包括put和remove)重叠。

我想知道是否可以使用以下使用ConcurrentHashMap的代码正确地工作:

myMap.put(X, myMap.get(X) + 1);

如果不行,我该如何管理这种情况?


在Java 8中,可以安全地使用myMap.merge(X, 1, Integer::sum)来完成此操作。 - shmosel
5个回答

13

并发映射不会帮助您的代码实现线程安全。您仍然可能遇到竞态条件:

Thread-1: x = 1, get(x)
Thread-2: x = 1, get(x)
Thread-1: put(x + 1) => 2
Thread-2: put(x + 1) => 2

发生了两个增量,但您仍然只得到+1。仅当您的目标是修改映射本身而不是其内容时,才需要并发映射。即使是最简单的HashMap对于并发读取也是线程安全的,只要该映射不再被改变。

因此,您需要一个原始类型的线程安全包装器,而不是线程安全的映射。可以使用java.util.concurrent.atomic中的某些内容,或者如果需要任意类型,则自己编写锁定容器。


我花了很长时间自问为什么一个包装在AtomicReference中的集合在并发处理时总是不准确。然后我看到了你的回复,感觉轻松了很多! - Java Main

4

一种想法是将ConcurrentMap与AtomicInteger相结合,后者具有一个increment方法。

 AtomicInteger current = map.putIfAbsent(key, new AtomicInteger(1));
 int newValue = current == null ? 1 :current.incrementAndGet();

或者(更高效地,感谢@Keppil)使用额外的代码保护来避免不必要的对象创建:
 AtomicInteger current = map.get(key);
 if (current == null){
     current = map.putIfAbsent(key, new AtomicInteger(1));
 }
 int newValue = current == null ? 1 : current.incrementAndGet();

1
@jolivier replace 可以重试,而 getAndIncrement 不行。 - Marko Topolnik
1
我通常在putIfAbsent()之前检查值是否存在,以避免每次创建新的AtomicInteger - Keppil
2
@jolivier 我肯定会避免每次都实现自己的重试逻辑。对我来说,这完全证明了使用 AtomicInteger 的必要性。它可以本地化地进行重试,甚至无法通过 Java 进行复制。 - Marko Topolnik
1
根据文档 http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ConcurrentHashMap.html#putIfAbsent%28K,%20V%29 ,putIfAbsent 可能会返回 null。在某些情况下,你的代码不会抛出 NPE 吗? - Tvaroh
@Tvaroh:没错。看起来你需要在这里进行空值检查。正在更新答案。 - Thilo
显示剩余5条评论

4
最佳实践。您可以使用HashMap和AtomicInteger。 测试代码:
public class HashMapAtomicIntegerTest {
    public static final int KEY = 10;

    public static void main(String[] args) {
        HashMap<Integer, AtomicInteger> concurrentHashMap = new HashMap<Integer, AtomicInteger>();
        concurrentHashMap.put(HashMapAtomicIntegerTest.KEY, new AtomicInteger());
        List<HashMapAtomicCountThread> threadList = new ArrayList<HashMapAtomicCountThread>();
        for (int i = 0; i < 500; i++) {
            HashMapAtomicCountThread testThread = new HashMapAtomicCountThread(
                    concurrentHashMap);
            testThread.start();
            threadList.add(testThread);
        }
        int index = 0;
        while (true) {
            for (int i = index; i < 500; i++) {
                HashMapAtomicCountThread testThread = threadList.get(i);
                if (testThread.isAlive()) {
                    break;
                } else {
                    index++;
                }
            }
            if (index == 500) {
                break;
            }
        }
        System.out.println("The result value should be " + 5000000
                + ",actually is"
                + concurrentHashMap.get(HashMapAtomicIntegerTest.KEY));
    }
}

class HashMapAtomicCountThread extends Thread {
    HashMap<Integer, AtomicInteger> concurrentHashMap = null;

    public HashMapAtomicCountThread(
            HashMap<Integer, AtomicInteger> concurrentHashMap) {
        this.concurrentHashMap = concurrentHashMap;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            concurrentHashMap.get(HashMapAtomicIntegerTest.KEY)
                    .getAndIncrement();
        }
    }
}

结果:

结果值应该为5000000,实际上是5000000

或者使用HashMap和synchronized,但比前者慢得多

public class HashMapSynchronizeTest {

    public static final int KEY = 10;

    public static void main(String[] args) {

        HashMap<Integer, Integer> hashMap = new HashMap<Integer, Integer>();
        hashMap.put(KEY, 0);
        List<HashMapSynchronizeThread> threadList = new ArrayList<HashMapSynchronizeThread>();
        for (int i = 0; i < 500; i++) {
            HashMapSynchronizeThread testThread = new HashMapSynchronizeThread(
                    hashMap);
            testThread.start();
            threadList.add(testThread);
        }
        int index = 0;
        while (true) {
            for (int i = index; i < 500; i++) {
                HashMapSynchronizeThread testThread = threadList.get(i);
                if (testThread.isAlive()) {
                    break;
                } else {
                    index++;
                }
            }
            if (index == 500) {
                break;
            }
        }
        System.out.println("The result value should be " + 5000000
                + ",actually is" + hashMap.get(KEY));
    }
}

class HashMapSynchronizeThread extends Thread {
    HashMap<Integer, Integer> hashMap = null;

    public HashMapSynchronizeThread(
            HashMap<Integer, Integer> hashMap) {
        this.hashMap = hashMap;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            synchronized (hashMap) {
                hashMap.put(HashMapSynchronizeTest.KEY,
                        hashMap
                                .get(HashMapSynchronizeTest.KEY) + 1);
            }
        }
    }
}

结果:

期望结果值为5000000,实际结果为5000000。

使用ConcurrentHashMap将会得到错误的结果。

public class ConcurrentHashMapTest {

    public static final int KEY = 10;

    public static void main(String[] args) {
        ConcurrentHashMap<Integer, Integer> concurrentHashMap = new ConcurrentHashMap<Integer, Integer>();
        concurrentHashMap.put(KEY, 0);
        List<CountThread> threadList = new ArrayList<CountThread>();
        for (int i = 0; i < 500; i++) {
            CountThread testThread = new CountThread(concurrentHashMap);
            testThread.start();
            threadList.add(testThread);
        }
        int index = 0;
        while (true) {
            for (int i = index; i < 500; i++) {
                CountThread testThread = threadList.get(i);
                if (testThread.isAlive()) {
                    break;
                } else {
                    index++;
                }
            }
            if (index == 500) {
                break;
            }
        }
        System.out.println("The result value should be " + 5000000
                + ",actually is" + concurrentHashMap.get(KEY));
    }
}

class CountThread extends Thread {
    ConcurrentHashMap<Integer, Integer> concurrentHashMap = null;

    public CountThread(ConcurrentHashMap<Integer, Integer> concurrentHashMap) {
        this.concurrentHashMap = concurrentHashMap;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            concurrentHashMap.put(ConcurrentHashMapTest.KEY,
                    concurrentHashMap.get(ConcurrentHashMapTest.KEY) + 1);
        }
    }
}

结果:

预计值应为5000000,实际值为11759


1

您可以将操作放在 synchronized (myMap) {...} 块中。


虽然这个答案在理论上是正确的,但由于性能问题我们不应该这样做。 - best wishes
@bestwishes你所说的性能影响是指什么? - v1shnu

0

你目前的代码并发地改变了你的映射值,所以这样是行不通的。

如果多个线程可以向你的映射中put值,那么你必须使用一个类似ConcurrentHashMap的并发映射,其中包含非线程安全的值,如IntegerConcurrentMap.replace将会实现你想要的效果(或者使用AtomicInteger来简化你的代码)。

如果你的线程只会更改映射中的值(而不是添加/更改键),那么你可以使用一个存储线程安全值标准映射,如AtomicInteger。然后你的线程会调用:map.get(key).incrementAndGet()


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接