Java:获取和清除Map的原子性操作

6
我可以帮您进行翻译。以下是您需要翻译的内容:

我希望实现以下逻辑:

-将使用以下结构

//Map<String, CopyOnWriteArrayList> keeping the pending updates 
//grouped by the id of the updated object
final Map<String, List<Update>> updatesPerId = new ConcurrentHashMap<>();

-n个生产者将会向updatesPerId映射表中添加更新(对于相同的id,可以同时添加2个更新)

-一个TimerThread将会定期运行并处理接收到的更新。类似于:

 final Map<String, List<Update>> toBeProcessed = new HashMap<>(updatesPerId);
 updatesPerId.clear();
 // iterate over toBeProcessed and process them

有没有办法在不同步生产者添加逻辑和定时器线程(消费者)的逻辑的情况下使这个逻辑线程安全?我考虑过原子清除+获取,但似乎ConcurrentMap没有提供这样的功能。另外,我必须提到更新应该按更新对象ID保留,因此我不能用队列或其他东西替换映射。
有什么想法吗? 谢谢!

可能使用RWLock可以正常工作。生产者将获取读取锁,消费者将获取写入锁。然而,当您实际上要写入内容时,获取读取锁并不直观。 - lucian.marcuta
请问您所说的“without synchronizing”是什么意思? - Andy Turner
3个回答

5
你可以利用 ConcurrentHashMap.compute 的原子性执行
你可以这样将内容放入 updatesPerId 中:
updatesPerId.compute(k, (k, list) -> {
  if (list == null) list = new ArrayList<>();
  // ... add to the list

  // Return a non-null list, so the key/value pair is stored in the map.
  return list;
});

这个代码没有使用computeIfAbsent方法,因此无法保证原子性。

如果想要在你的线程中移除元素:

for (String key : updatesPerId.keySet()) {
  List<Update> list = updatesPerId.put(key, null);
  updatesPerId.compute(key, (k, list) -> {
    // ... Process the contents of the list.

    // Removes the key/value pair from the map.
    return null;
  });
}

因此,向列表中添加键(或处理该键的所有值)可能会阻塞,如果您恰好同时在两个位置处理该键,则会被阻塞;否则,它将不会被阻塞。

编辑:正如@StuartMarks指出的那样,最好先将所有内容从地图中取出,然后稍后再处理它们,以避免阻止其他线程尝试添加:

Map<String, List<Update>> newMap = new HashMap<>();
for (String key : updatesPerId.keySet()) {
  newMap.put(key, updatesPerId.remove(key));
}
// ... Process entries in newMap.

我会将列表内容复制到其他地方,并从重新映射函数返回 null,稍后再处理这些内容。在重新映射函数内部进行处理可能需要很长时间,这可能会阻塞其他想要更新映射的线程。 - Stuart Marks
@StuartMarks 当然可以。List<Update> thingsToProcess = updatesPerId.remove(key); 这种写法也可以吧? - Andy Turner
1
@StuartMarks同意。我一直在考虑将其编辑到我的答案中;但还没有完全做到。 - Andy Turner
1
@StuartMarks “无需进行其他查找” 适用于在迭代 HashMap.entrySet() 时。对于 ConcurrentHashMap,在 entry set 的迭代器上调用 remove() 或在 map 上调用 remove(key) 没有区别。它甚至不会更安全,涉及并发更新。考虑以下代码:ConcurrentHashMap<String,Integer> m = new ConcurrentHashMap<>(); m.put("foo", 42); Iterator<Map.Entry<String, Integer>> it = m.entrySet().iterator(); Map.Entry<?, ?> e = it.next(); m.put("foo", 100); System.out.println("removing "+e+" from "+m); it.remove(); System.out.println(m); - Holger
@Holger Huh。当前CHM实现确实是这样的。但是依赖这种假设似乎不明智。我仍然建议通过迭代器进行修改,以防CHM实现发生更改,或者调用代码修改为使用不同的映射实现。 - Stuart Marks
显示剩余3条评论

2

我建议使用LinkedBlockingQueue而不是CopyOnWriteArrayList作为映射值。使用COWAL,添加操作的成本会逐渐增加,因此添加N个元素的结果将导致N^2的性能下降。LBQ的添加操作是O(1)。此外,LBQ还有drainTo方法,可以在这里有效地使用。你可以这样做:

final Map<String, Queue<Update>> updatesPerId = new ConcurrentHashMap<>();

制造商:

updatesPerId.computeIfAbsent(id, LinkedBlockingQueue::new).add(update);

消费者:

updatesPerId.forEach((id, queue) -> {
    List<Update> updates = new ArrayList<>();
    queue.drainTo(updates);
    processUpdates(id, updates);
});

这与您的建议有些不同。这种技术会为每个id处理更新,但是允许生产者在此过程中继续向映射添加更新。这会为每个id在映射中留下映射条目和队列。如果ids被经常重复使用,映射条目的数量将达到高水平线。
如果新的ids不断涌入,而旧的ids变得不可用,则映射将不断增长,这可能不是您想要的。如果是这种情况,您可以使用Andy Turner的答案中的技术。
如果消费者确实需要对整个映射进行快照和清除,则必须使用锁定,而这正是您想避免的。

0
有没有办法在不同步生产者添加逻辑和timerThread(消费者)逻辑的情况下使此逻辑线程安全?
简而言之,根据您对“同步”的理解,答案是否定的。
最简单的方法是将您的Map包装成自己的类。
class UpdateManager {
    Map<String,List<Update>> updates = new HashMap<>();
    public void add(Update update) {
        synchronized (updates) {
            updates.computeIfAbsent(update.getKey(), k -> new ArrayList<>()).add(update);
        }
    }
    public Map<String,List<Update>> getUpdatesAndClear() {
        synchronized (updates) {
            Map<String,List<Update>> copy = new HashMap<>(updates);
            updates.clear();
            return copy;
        }
    }
}

有没有办法使这个逻辑线程安全,而不需要对生产者的添加逻辑和计时器线程(消费者)的逻辑进行同步? - Andy Turner
@AndyTurner 是的,我回答了那个问题。 - daniu
你并没有提供具体的解决方案,只是说了“这取决于情况”,然后使用了同步技术。 - Andy Turner
@AndyTurner 我说“不,这是不可能的”。 “取决于”是指“您是否通过锁定来实现客户端的显式同步”。请注意,您的答案也使用了同步,只是隐藏起来了。 - daniu

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接