在删除ConcurrentHashMap条目时迭代

13

我想要定期遍历一个 ConcurrentHashMap 并删除其中的条目,类似于以下代码:

for (Iterator<Entry<Integer, Integer>> iter = map.entrySet().iterator(); iter.hasNext(); ) {
    Entry<Integer, Integer> entry = iter.next();
    // do something
    iter.remove();
}

问题在于在我迭代时,另一个线程可能会更新或修改值。如果发生这种情况,那些更新可能会永久丢失,因为当我迭代时,我的线程只看到过期的值,但是 remove() 将删除活动条目。

经过一些考虑,我想出了以下解决方法:

map.forEach((key, value) -> {
    // delete if value is up to date, otherwise leave for next round
    if (map.remove(key, value)) {
        // do something
    }
});

这种方法的一个问题是它无法捕获对可变值的修改,如果这些可变值没有实现equals()方法(比如AtomicInteger),则会出现问题。有没有更好的方法可以安全地处理并发修改呢?


@ClaudioCorsi,这并不会改变我看到的已删除条目的旧版本的事实。 - shmosel
问题在于,您需要能够知道自开始迭代地图以来已更新了什么。 即使您可以知道已更新哪些对象。 仍然有可能另一个线程具有对已处理但未更新的对象的引用。 那个对象会被重新添加还是只是被更新了? 这个对象应该生成另一个回调吗? - Claudio Corsi
这些条目是短期的还是长期的?如果它们是短期的,那么您可以考虑使用弱引用作为值,然后只需处理引用队列。 - Claudio Corsi
@ClaudioCorsi 我在谈论对地图可见的更新,例如 put()merge()compute() 等。 - shmosel
并发容器不提供回调机制来通知线程更改。这是您需要自己实现的内容。即使如此,它也会很复杂,因为您需要知道更新后的值是否已被处理,这只会增加另一层复杂性。最好在完成数据清除之前锁定对映射的所有访问。 - Claudio Corsi
显示剩余15条评论
3个回答

2

你的解决方法可行,但存在一种潜在情况。如果某些条目不断更新,则map.remove(key, value)可能永远不会返回true,直到更新结束。

如果你使用JDK8,这是我的解决方案:

for (Iterator<Entry<Integer, Integer>> iter = map.entrySet().iterator(); iter.hasNext(); ) {
    Entry<Integer, Integer> entry = iter.next();
    Map.compute(entry.getKey(), (k, v) -> f(v));
    //do something for prevValue
}
....
private Integer prevValue;

private Integer f(Integer v){
    prevValue = v;
    return null;
}

compute()函数将对给定的值应用f(v)函数,并将结果赋值给全局变量,同时删除该条目。

根据Javadoc文档,该函数是原子性的。

尝试为指定的键及其当前映射值(如果没有当前映射值,则为null)计算映射。整个方法调用是原子性的。其他线程对此地图的某些尝试更新操作可能会在计算正在进行时被阻止,因此计算应该简短而简单,并且不能尝试更新此Map的任何其他映射。


如果 f() 返回 null,compute() 将返回 null。 - shmosel
此外,使用您的方法迭代键会更加直观。 - shmosel

1
您的解决方案实际上相当不错。除此之外,还有其他设施可以构建类似的解决方案(例如使用computeIfPresent()和墓碑值),但它们有自己的注意事项,而且我在稍微不同的用例中使用了它们。
至于使用未实现equals()的类型作为映射值,您可以在对应类型的顶部使用自己的包装器。这是将自定义语义注入ConcurrentMap提供的原子替换/删除操作的最直接方法。
更新
以下是一份草图,展示了如何在ConcurrentMap.remove(Object key, Object value) API基础上构建:
  • 在您使用的可变类型之上定义一个包装器类型,还定义了自定义的equals()方法,建立在当前可变值之上。
  • 在您的BiConsumer中(您传递给forEach的lambda表达式中),创建该值的深层副本(其类型为您的新包装器类型)并执行确定该值是否需要被删除的逻辑。
  • 如果需要删除该值,请调用remove(myKey, myValueCopy)
    • 如果在计算值是否需要被删除时发生了一些并发更改,则remove(myKey, myValueCopy)将返回false(除了ABA问题,这是一个单独的主题)。

以下是一些说明此内容的代码:

import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

public class Playground {

   private static class AtomicIntegerWrapper {
      private final AtomicInteger value;

      AtomicIntegerWrapper(int value) {
         this.value = new AtomicInteger(value);
      }

      public void set(int value) {
         this.value.set(value);
      }

      public int get() {
         return this.value.get();
      }

      @Override
      public boolean equals(Object obj) {
         if (this == obj) {
            return true;
         }
         if (!(obj instanceof AtomicIntegerWrapper)) {
            return false;
         }
         AtomicIntegerWrapper other = (AtomicIntegerWrapper) obj;
         if (other.value.get() == this.value.get()) {
            return true;
         }
         return false;
      }

      public static AtomicIntegerWrapper deepCopy(AtomicIntegerWrapper wrapper) {
         int wrapped = wrapper.get();
         return new AtomicIntegerWrapper(wrapped);
      }
   }

   private static final ConcurrentMap<Integer, AtomicIntegerWrapper> MAP
         = new ConcurrentHashMap<>();

   private static final int NUM_THREADS = 3;

   public static void main(String[] args) throws InterruptedException {
      for (int i = 0; i < 10; ++i) {
         MAP.put(i, new AtomicIntegerWrapper(1));
      }

      Thread.sleep(1);

      for (int i = 0; i < NUM_THREADS; ++i) {
         new Thread(() -> {
            Random rnd = new Random();
            while (!MAP.isEmpty()) {
               MAP.forEach((key, value) -> {
                  AtomicIntegerWrapper elem = MAP.get(key);
                  if (elem == null) {
                     System.out.println("Oops...");
                  } else if (elem.get() == 1986) {
                     elem.set(1);
                  } else if ((rnd.nextInt() & 128) == 0) {
                     elem.set(1986);
                  }
               });
            }
         }).start();
      }

      Thread.sleep(1);

      new Thread(() -> {
         Random rnd = new Random();
         while (!MAP.isEmpty()) {
            MAP.forEach((key, value) -> {
               AtomicIntegerWrapper elem =
                     AtomicIntegerWrapper.deepCopy(MAP.get(key));
               if (elem.get() == 1986) {
                  try {
                     Thread.sleep(10);
                  } catch (Exception e) {}
                  boolean replaced = MAP.remove(key, elem);
                  if (!replaced) {
                     System.out.println("Bailed out!");
                  } else {
                     System.out.println("Replaced!");
                  }
               }
            });
         }
      }).start();
   }
}

您会看到打印出的“已救助!”,与“已替换!”(删除成功,因为没有您关心的并发更新),计算将在某个时刻停止。
  • 如果您删除自定义的equals()方法并继续使用副本,则会看到无休止的“已救助!”,因为副本永远不会被视为与映射中的值相等。
  • 如果您不使用副本,则不会看到“已救助!”的打印输出,并且您将遇到您正在解释的问题-无论并发更改如何,值都将被删除。

其实,我认为我的equals()观点有点不相关。只要值被改变而不是被替换,问题就会出现,因为remove()看到的是同一个引用。我猜如果我们愿意为每个更新创建一个新的包装器,那么这个包装器就可以起作用。 - shmosel
除非我错了,否则我认为这种方法对于被改变的值是有效的。我会更新我的答案,添加一个代码示例来说明这种方法。 - Dimitar Dimitrov
你的解决方案可行,但我的想法对我的要求来说更简单一些。在删除之前,我不需要验证当前值;只要我能看到最新版本,我就想删除任何值。因此,我所需要做的就是在更新时创建一个浅包装副本以避免引用相等,并依靠remove()和更新操作(通过compute()merge()等)的原子性保证来确保成功删除后不能改变值。 - shmosel
我的方法的另一个好处是,包装器不需要实现equals(),并且它可以完全通用,因为它不需要特定于类型的逻辑。对于我的项目来说,重要的是值可以是通用的。 - shmosel
转念一想,我认为可变值根本不是问题。事实上,我们甚至可以使用Iterator.remove()来处理可变值,因为它们不会变得陈旧,而且映射的原子性保证了在删除后不会有任何修改。最初的唯一问题是不可变值,它们可以被替换,从而变得陈旧。解决这个问题的方法是我的解决方法,即调用map.remove(Object, Object)以确保删除的值与我们看到的值相同。 - shmosel
好的,我只能回答你提出的问题,没有任何额外的上下文,我无法预测你试图解决的实际问题是什么。而且我认为我已经回答了前面的问题 : )。我所展示的代码示例将适用于包装可变和不可变类型。但是,我不确定如何在没有“equals()”方法的情况下使包装器复制。 - Dimitar Dimitrov

0

让我们考虑一下你有哪些选项。

  1. 创建一个自己的容器类,其中包含 isUpdated() 操作,并使用你自己的解决方法。

  2. 如果你的映射只包含很少量的元素,并且你频繁地迭代这个映射与添加/删除操作相比。那么使用 CopyOnWriteArrayList 是一个不错的选择。

    CopyOnWriteArrayList<Entry<Integer, Integer>> lookupArray = ...;
  3. 另一种选择是实现你自己的 CopyOnWriteMap

    public class CopyOnWriteMap<K, V> implements Map<K, V>{
    
        private volatile Map<K, V> currentMap;
    
        public V put(K key, V value) {
            synchronized (this) {
                Map<K, V> newOne = new HashMap<K, V>(this.currentMap);
                V val = newOne.put(key, value);
                this.currentMap = newOne; // 原子操作
                return val;
            }
        }
    
        public V remove(Object key) {
            synchronized (this) {
                Map<K, V> newOne = new HashMap<K, V>(this.currentMap);
                V val = newOne.remove(key);
                this.currentMap = newOne; // 原子操作
                return val;
            }
        }
    
        [...]
    }
    

这里有一个负面效应。如果你正在使用写时复制的集合,你的更新将永远不会丢失,但是你可能会再次看到一些之前删除的条目。

最糟糕的情况是,如果地图被复制,删除的条目将每次都被恢复。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接