使用ConcurrentHashMap实现线程安全性

8
我有如下类。我使用ConcurrentHashMap。我有许多线程写入映射表和一个计时器,每5分钟保存一次映射数据。 我通过在写入映射表条目时使用putIfAbsent()来实现线程安全。然而,当我从中读取并使用clear()方法删除所有条目时,我希望没有其他线程在我读取映射内容并将其删除的过程中写入映射表。显然,即使使用synchronized(lock){},我的代码也不是线程安全的,因为在saveEntries()中拥有锁的线程不一定是在log()方法中写入我的映射表的线程!除非我用相同的锁对象锁定log()中的整个代码! 我想知道是否有其他方法可以在不强制使用外部锁同步的情况下实现线程安全?任何帮助都将不胜感激。
public class Logging {

private static Logging instance;    
private static final String vendor1 = "vendor1";
private static final String vendor2 = "vendor2";    
private static long delay = 5 * 60 * 1000;

private ConcurrentMap<String, Event> vendor1Calls = new ConcurrentHashMap<String, Event>();
private ConcurrentMap<String, Event> vendor2Calls = new ConcurrentHashMap<String, Event>();

private Timer timer;    
private final Object lock = new Object();

private Logging(){
    timer = new Timer();                
    timer.schedule(new TimerTask() {
        public void run() {
            try {
                saveEntries();
            } catch (Throwable t) {
                timer.cancel();
                timer.purge();
            }
        }       
    }, 0, delay);
}

public static synchronized Logging getInstance(){     
    if (instance == null){
        instance = new Logging();
    }
    return instance;
 }

public void log(){      
    ConcurrentMap<String, Event> map;
    String key = "";        

    if (vendor1.equalsIgnoreCase(engine)){
        map = vendor1Calls;
    }else if(vendor2.equalsIgnoreCase(engine)){  
        map = vendor2Calls;
    }else{
        return;
    }       


    key = service + "." + method;
// It would be the code if I use a regular HashMap instead of ConcurrentHashMap
    /*Event event = map.get(key);       

    // Map does not contain this service.method, create an Event for the first     time.
    if(event == null){
        event = new Event();            
        map.put(key, event);

        // Map already contains this key, just adjust the numbers.
    }else{
        // Modify the object fields
    }*/
    //}

    // Make it thread-safe using CHM
    Event newEvent = new Event();
    Event existingEvent= map.putIfAbsent(key, newEvent); 

    if(existingEvent!=null && existingEvent!=newEvent){
        // Modify the object fields
}       

private void saveEntries(){

    Map<String, List<Event>> engineCalls = null;
    try {           

        engineCalls = new HashMap<String, List<Event>>();
        List<Event> events = null;

// How can I achieve therad safety here w/o applying any lock?
        //synchronized(lock){
            if(!vendor1Calls.isEmpty()){
                events = new ArrayList<Event>();
                events.addAll(vendor1Calls.values());
                engineCalls.put(vendor1, events);
                vendor1Calls.clear();
            }
            if(!vendor2Calls.isEmpty()){
                events = new ArrayList<Event>();
                events.addAll(vendor2Calls.values());
                engineCalls.put(vendor2, events);
                vendor2Calls.clear();
            }
        //}

// logICalls() saves the events in the DB.          
        DBHandle.logCalls(engineCalls);
    } catch (Throwable t) {         
    } finally {
        if(engineCalls!=null){
            engineCalls.clear();
        }                       
    }   
}       

}


2
您可以通过点击答案左侧的灰色复选标记来接受答案。但是,如果答案并未解决您的问题,请不要接受它,即使有同行的压力也不要接受。 - Marko Topolnik
是的,我建议您访问http://stackoverflow.com/users/1052348/bluesky?tab=questions,点击每个链接,并在最佳解决方案旁边点击复选标记以接受它。 PS:您可以接受自己提出的问题的答案。 - Kevin Jin
跨贴到这里,你可以仅使用CHM功能使其原子化。 - jtahlborn
我知道这是一个老问题,但我不明白为什么你要使用一个看起来本质上像队列的映射。你可以将日志条目添加到并发队列中,然后在保存条目时查看队列的大小,并从队列的前面删除相应数量的条目并写入它们。因为你只查看一次大小,所以如果在保存它们时添加了更多条目,那么它们将留待下一次。如果我误解了你的目的,请原谅。 - David Conrad
5个回答

3
然而,当我从中读取并通过clear()方法删除所有条目时,我希望在我读取映射内容并删除它们的过程中,没有其他线程写入映射。

我认为你想表达的是你不介意严格锁定映射。相反,你真正关心的只是在vender1Calls.values()和vendor1Calls.clear()之间丢失任何日志条目,对吗?

如果是这样的话,我可以想象你可以将

events.addAll(vendor1Calls.values());
vendor1Calls.clear();

在saveEntries中使用以下内容:

for (Iterator<Event> iter = vendor1Calls.values().iterator(); iter.hasNext(); ) {
    Event e = iter.next();
    events.add(e);
    iter.remove();
}

那么,你只会删除已添加到事件列表中的事件。在 saveEntries() 执行时仍然可以写入 vendor1Calls 映射,但迭代器将跳过已添加的值。

它可能实际上会迭代在其间添加的值,但是并没有保证。 - Marko Topolnik
在迭代时编写的条目可能会被保存或不保存(“迭代器和枚举返回反映哈希表状态的元素,该状态是迭代器/枚举创建后某个时间点或之后的状态。”),但有一件事是确定的:绝对不会丢失任何条目,因为通过在saveEntries()中调用iter.remove()删除的那些条目已经添加到了事件ArrayList中。 - Kevin Jin
我假设这里发生的是:TimerThread执行以下操作:Iterator<Event> iter = vendorCalls.values().iterator();它会获取vendorCalls此刻的快照,并遍历它们,填充事件列表然后从vendorCalls中删除该条目;因此,它不关心在log()方法中从其他线程添加的条目;所以也不会抛出ConcurrentModificationEx。循环结束时,当iter完成通过其快照的迭代时,我将Events的内容保存到db中,并且每5...重复此过程。 - blueSky
每5分钟我会保存在地图中添加的内容,下次调用saveEntries()方法时再保存。所以当你提到:“如果你不介意TimerThread在创建初始迭代器后可能会看到Map中的元素,那么这应该可以工作”,它可能会引起什么问题? - blueSky
@blueSky Iterator<Event> iter = vendorCalls.values().iterator(); 返回的是 Map 本身的视图,而不是快照。两者之间的区别在于快照是在调用时刻的内容。视图是在 Iterator 的方法调用时刻的 Map 内容。这意味着当你调用 .iterator() 时,Map 的大小可能是10,而在某个 next 调用时,大小可能是15。但是这种差异并不保证。 - John Vint
显示剩余13条评论

3
没有任何外部同步,你无法通过CHM实现这一点。返回的迭代器视图是弱一致的,这意味着Map的内容在你实际迭代时可能会发生更改。
看起来你需要使用Collections.synchronizedMap方法来获得你要寻找的功能。
编辑以使我的观点更加清晰:
要通过synchronizedMap实现这一点,首先必须对Map进行同步,然后才能迭代或将其内容复制到另一个Map中,然后清除。
Map map = Collections.synchronizedMap(new HashMap());

public void work(){
  Map local = new HashMap();
  synchronized(map){
     local.putAll(map);
     map.clear();
  }
  //do work on local instance 
}

就像我之前提到的那样,您可以使用迭代+删除来代替local实例,具体操作可以参考@Kevin Jin的回答。


synchronizedMap的迭代器在面对映射并发更改时毫无用处。另一方面,CHM迭代器捕获任何后续条目都没有问题,甚至可能更好,因为会保存更多的条目并从映射中删除。 - Marko Topolnik
@Marko Topolnik,我是在谈论原子性问题。我应该更明显地表明他必须对Map本身进行synchronize。这显然会导致性能下降,但如果他需要在另一个线程迭代并清除时没有线程更新,则没有其他方法可以做到这一点。然而,从他的需求来看,情况可能并非如此。 - John Vint
@JohnVint:由于迭代器不给我快照,那么我会不会陷入永无止境的循环中!因为线程实际上可能比我删除它们更快地向映射表中添加条目!所以有没有办法在不使用锁的情况下获取快照或使其原子化? - blueSky
理论上是可以的,但在实践中不太可能。您可以通过在迭代之前获取大小来限制速度,如果计数超过了限制,您可以提前退出。话虽如此,无限循环并不是我担心的事情。 - John Vint
谢谢大家的时间和回复,非常有帮助。我会考虑一下...看起来在这里让它原子化不是一个选项。我会选择synchronizedMap或者只是遍历。再次感谢。 - blueSky

0
以下代码使用 persistent map,它来自于functional java项目。它使用更多的内存,但在多个线程中使用是安全的(AFAIK :))。AtomicReference 中唯一可变的值是通过compare-and-set更新的。该映射和事件不可变,因此是线程安全的。而且,我不是清除地图,而是替换对它的引用。
import fj.F;
import fj.Ord;
import fj.data.TreeMap;

import java.util.*;
import java.util.concurrent.atomic.AtomicReference;

public class Logging
{
    // Event is immutable
    private static class Event
    {
        // updates are done by creating new values
        Event update(String key)
        {
            return new Event();
        }
    }

    // Refactored code pertaining to one vendor into a separate class.
    private static class EngineLogger
    {
        public final String vendor;
        private final AtomicReference<TreeMap<String, Event>> mapRef =
                new AtomicReference<TreeMap<String, Event>>(TreeMap.<String, Event>empty(Ord.stringOrd));

        private EngineLogger(String vendor)
        {
            this.vendor = vendor;
        }

        public void log(String service, String method)
        {
            final String key = service + "." + method;
            boolean updated = false;
            while (! updated)
            {
                // get the current value of the map
                TreeMap<String, Event> currMap = mapRef.get();

                // create an updated value of the map, which is the current map plus info about the new key
                TreeMap<String, Event> newMap = currMap.update(key, new F<Event, Event>()
                {
                    @Override
                    public Event f(Event event)
                    {
                        // Modify the object fields of event, if the map contains the key
                        return event.update(key);
                    }
                    // create a new event if the map does not contain the key
                }, new Event());

                // compare-and-set the new value in .. repeat until this succeeds
                updated = mapRef.compareAndSet(currMap, newMap);
            }
        }

        public List<Event> reset()
        {
            /* replace the reference with a new map */
            TreeMap<String, Event> oldMap = mapRef.getAndSet(TreeMap.<String, Event>empty(Ord.stringOrd));

            /* use the old map to generate the list */
            return new ArrayList<Event>(oldMap.toMutableMap().values());
        }
    }

    private static Logging instance;
    private static long delay = 5 * 60 * 1000;
    private final Timer timer;

    private final EngineLogger vendor1 = new EngineLogger("vendor1");
    private final EngineLogger vendor2 = new EngineLogger("vendor2");

    private Logging()
    {
        timer = new Timer();
        timer.schedule(new TimerTask()
        {
            public void run()
            {
                try
                {
                    saveEntries();
                }
                catch (Throwable t)
                {
                    timer.cancel();
                    timer.purge();
                }
            }
        }, 0, delay);
    }

    public static synchronized Logging getInstance()
    {
        if (instance == null)
        {
            instance = new Logging();
        }
        return instance;
    }

    public void log(String engine, String service, String method)
    {
        if (vendor1.vendor.equals(engine))
        {
            vendor1.log(service, method);
        }
        else if (vendor2.vendor.equals(engine))
        {
            vendor2.log(service, method);
        }
    }

    private void saveEntries()
    {
        Map<String, List<Event>> engineCalls = new HashMap<String, List<Event>>();
        engineCalls.put(vendor1.vendor, vendor1.reset());
        engineCalls.put(vendor2.vendor, vendor2.reset());
        DBHandle.logCalls(engineCalls);
    }
}

尽管由于内存使用情况可能不适合我的目的,但非常有帮助。非常感谢分享。 - blueSky
经过更深思熟虑,我意识到这种解决方案的可扩展性很差。即使不同线程为不同的键修改事件,log方法中的_CAS_也会失败。 - Binil Thomas
感谢更新。由于内存问题,我无法使用它。了解到有什么新内容总是很棒的 :) - blueSky

0

这个示例的原子版本 在此线程中 显示(仅使用ConcurrentMap中的功能)。


0

我的最佳建议是使用ReadWriteLock,但由于您明确表示不想使用任何锁(顺便说一句,ConcurrentHashMap可能会在内部使用它们),您可以尝试以下方法。

对于每个映射,使用AtomicReference,当需要记录其内容时,使用getAndSet将旧映射替换为全新的空映射。

现在你有一个独占使用的 Map,可以遍历并清除任意数量的内容。不幸的是,存在一个小问题(使用锁可以解决),那就是如果另一个线程正在添加内容到 Map 中,而此时你将其与一个空 Map 交换,就会出现问题。你可以在这一点上添加延迟,希望等待足够长的时间让其他线程完成它们正在做的事情。也许有一些内部功能可以使用 ConcurrentHashMap,等待所有人完成对其的操作。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接