Java并发编程:在映射中线程安全地修改值

3
我在Java中遇到了并发和映射的一些问题。基本上,我有多个线程使用(读取和修改)自己的映射,但是每个这些映射都是一个更大的映射的一部分,而该更大的映射正在被另一个线程读取和修改。
我的主方法创建所有线程,线程创建它们各自的映射,然后将其放入“主”映射中:
Map<String, MyObject> mainMap = new HashMap<String, Integer>();
FirstThread t1 = new FirstThread();
mainMap.putAll(t1.getMap());
t1.start();
SecondThread t2 = new SecondThread();
mainMap.putAll(t2.getMap());
t2.start();
ThirdThread t3 = new ThirdThread(mainMap);
t3.start();

我现在面临的问题是第三个(主)线程在地图中看到任意值,取决于另外两个线程何时更新它们自己的项目。然而,我必须保证第三个线程能够遍历和使用地图中的值,而不必担心所读取的部分是“旧”的:

第一个线程(类比于第二个线程):

for (MyObject o : map.values()) {
    o.setNewValue(getNewValue());
}

第三线程:

for (MyObject o : map.values()) {
    doSomethingWith(o.getNewValue());
}

有什么想法吗?我考虑使用全局可访问的(通过静态类的static final Object)锁,在每个线程中同步修改地图时会用到。或者是否有特定的Map实现可以解决这个问题,我可以使用?
提前致谢!
编辑:如@Pyranja所建议的那样,同步getNewValue()方法是可能的。然而,我忘了提到我实际上正在尝试做一些类似于事务的事情,其中t1和t2在t3处理这些值之前/之后修改多个值。 t3的实现方式使得如果值没有改变,则doSomethingWith()实际上不会对该值进行任何操作。

听起来第三个线程需要等待前两个线程。注意:如果是这种情况,您实际上只需要一个额外的线程。 - Peter Lawrey
如果您想确保读取的部分是最新值,我相信您需要在其中使用volatile(以及synchronized)。 - Dhruv Gairola
1
你所看到的行为并不是任何同步问题的结果。对t1的映射和t2的映射的修改永远不会在mainMap中可见,因为在t1和t2开始之前,你正在将它们的初始内容“复制”到mainMap中。(如果你使用了两个映射的视图,则会出现同步问题,但是现在没有通信需要同步。) - jacobm
1
@jacobm 我认为他没有修改地图本身(put/remove),而是修改地图内部的值对象。一旦线程同步,这些更改将在mainMap中可见。 - sharakan
@sharakan 啊,我一开始把“setNewValue”理解成“用不同的值替换映射中的值”的简写,但如果你认为“setNewValue”修改了映射的值本身,那我同意你的看法。 - jacobm
3个回答

3
为了在单个值对象之上进行更高级别的同步,您需要使用锁来处理各个线程之间的同步。一种方法是使用ReadWriteLock,而无需过多更改代码。线程1和线程2是写入者,线程3是读取者。
您可以使用两个锁或一个锁来完成此操作。下面我将概述使用一个锁、两个写入线程和一个读取线程进行操作,而不用担心数据更新期间出现异常(即事务回滚)。
尽管如此,这听起来像是一个经典的生产者-消费者场景。您应该考虑使用类似BlockingQueue这样的东西来实现线程之间的通信,就像这个问题中所概述的那样。
还有其他一些需要考虑更改的事情,例如使用Runnable而不是扩展Thread
private static final class Value {

    public void update() {

    }

}

private static final class Key {

}

private final class MyReaderThread extends Thread {

    private final Map<Key, Value> allValues;

    public MyReaderThread(Map<Key, Value> allValues) {
        this.allValues = allValues;
    }

    @Override
    public void run() {
        while (!isInterrupted()) {
            readData();
        }
    }

    private void readData() {
        readLock.lock();
        try {
            for (Value value : allValues.values()) {
                // Do something
            }
        }
        finally {
            readLock.unlock();
        }

    }
}

private final class WriterThread extends Thread {

    private final Map<Key, Value> data = new HashMap<Key, Value>();

    @Override
    public void run() {
        while (!isInterrupted()) {
            writeData();
        }
    }

    private void writeData() {
        writeLock.lock();

        try {
            for (Value value : data.values()) {
                value.update();
            }
        }
        finally {
            writeLock.unlock();
        }
    }
}

private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

private final ReadLock readLock;
private final WriteLock writeLock;

public Thing() {
    readLock = lock.readLock();
    writeLock = lock.writeLock();
}

public void doStuff() {
    WriterThread thread1 = new WriterThread();
    WriterThread thread2 = new WriterThread();

    Map<Key, Value> allValues = new HashMap<Key, Value>();
    allValues.putAll(thread1.data);
    allValues.putAll(thread2.data);
    MyReaderThread thread3 = new MyReaderThread(allValues);

    thread1.start();
    thread2.start();
    thread3.start();
}

是的!这正是我所寻找的!我想点赞,但我缺乏声望 :( - phex
@phex 好的,有时间再联系我!;) - sharakan
关于BlockingQueue的一些反馈:是的,与其使用HashMap相比,这似乎是最合理的选择。不幸的是,这个软件已经开发到了一个无法再替换Map的阶段:( 但是我肯定会在另一个项目中考虑使用BlockingQueue! - phex
@phex 另外,你应该考虑使用 Runnables 而不是 Thread 子类。请查看我的更新答案。 - sharakan
我必须承认,我在问题中简化了虚拟代码。实际代码确实使用了Runnables :-) - phex

2

ConcurrentHashMapjava.util.concurrent 中的一个线程安全的 Map 实现,它提供比 synchronizedMap 更高程度的并发性。这意味着大量的读操作几乎总是可以并行执行,同时进行的读写操作通常也可以并行执行,而多个同时进行的写操作也经常可以并行执行。(类 ConcurrentReaderHashMap 为多个读操作提供了类似的并行性,但只允许一个活动的写操作。)ConcurrentHashMap 被设计用于优化检索操作。


1
+1 for java.util.concurrent。大多数并发问题的简单答案都可以在那里找到。 - Aurand
是的,但我认为这里的问题不在于同步地图,而在于其中的(可变)值对象。根据已发布的OP代码,地图本身并没有改变,而是值对象发生了变化。 - sharakan
这里的好处在于 ConcurrentHashMap.get() 是一个 volatile 读取。 - Ralf H

1
你的示例代码可能会引起误解。在第一个示例中,你创建了一个 HashMap<String,Integer>,但是第二部分遍历了映射值,这些值实际上是 MyObject。理解共享哪些可变状态以及它们在哪里共享是同步的关键。 Integer 是不可变的。它可以自由共享(但对于 Integer 的引用是可变的 - 必须安全地发布和/或同步)。但是你的代码示例表明,映射被填充了可变的 MyObject 实例。
鉴于映射条目(键 -> MyObject 引用)未被任何线程更改,并且所有映射都在任何线程启动之前创建并安全发布,我认为仅同步修改 MyObject 就足够了。例如:
public class MyObject {
   private Object value;

   synchronized Object getNewValue() {
      return value;
   }

   synchronized void setNewValue(final Object newValue) {
      this.value = newValue;
   }
}

如果我的假设不正确,请澄清您的问题/代码示例,并考虑@jacobm的评论和@Alex的答案。

+1 这听起来对我来说是正确的。 OP 只需要弄清楚他是否希望 t1 和 t2 在迭代完全完成之前就可以访问它。在这种情况下,他需要在 MyObject 的更高级别上进行一些同步。 - sharakan
这也是我考虑过的事情。然而正如@sharakan指出的那样,我在OP中漏掉了一个重要部分: 我在t1、t2和t3中所做的事情都是类似于事务的操作。我想在使用t3迭代这些值之前修改多个值。 - phex

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接