高性能并发MultiMap Java/Scala

61

我正在寻找一个高性能、并发的MultiMap。我已经到处搜索了,但是我无法找到一种使用与ConcurrentHashMap相同方法的解决方案(仅锁定哈希数组的一部分)。

这个Multimap将经常被读取、添加和删除。

多重映射的键将是字符串,其值将是任意的。

我需要O(1)来查找给定键的所有值,删除操作的时间复杂度为O(N)可以接受,但是如果是O(logN)则更好。

对于某个键的最后一个值的删除非常关键,它将从该键中删除容器的所有值,以避免内存泄漏。

编辑:这是我构建的解决方案,可在ApacheV2下获得:索引(多重映射)


7
除了桶算法之类的东西,没有O(1)查找的地图。哈希映射具有非常小的c的O(cn)。 - ziggystar
2
ziggystar,这取决于哈希函数如何分布键。如果它随机分布-对于任意字符串您可以期望现代哈希这样做-那么查找是O(1)的。这也是HashMap Javadoc所说的。 - Thomas Kappler
2
我有一个可能包含数百万个对象的注册表,它们可能共享一些属性,我需要为这些属性建立索引,以便可以检索具有特定属性的所有对象。 - Viktor Klang
1
也许现在是在cstheory.stackexchange.com上寻找答案的时候了?看起来你将会自己编写数据结构... - Daniel C. Sobral
2
@John V:这是我建立的内容:http://gist.github.com/566793 - Viktor Klang
显示剩余10条评论
10个回答

11
为什么不使用一些类似Scala的漂亮方法(例如隐式转换为Iterable或任何所需的东西,以及更新方法)来包装ConcurrentHashMap[T,ConcurrentLinkedQueue[U]]呢?

2
@Viktor - 如果你有(key,value)对,map.get(key).remove(value)应该可以做到这一点,只要在删除键时留下空队列作为占位符,并捕获异常(包括get的NPE)。如果你不能将队列留作占位符,那么除非你锁定整个映射表来清除垃圾,否则很难确保安全。 - Rex Kerr
无法离开队列,因为这会导致内存泄漏。我正在考虑使用ConcurrentHashMap源代码并将其弯曲到我的意愿,但这似乎是一种非常粗糙的方法。 - Viktor Klang
1
这可能是最好的解决方案,您可以限制在找到一个空的队列时才锁定整个集合,我不确定您是否可以避免编写自己的实现或更改您想要使用此嵌套结构而不是Multimap来处理它的方式。 - Jon Freedman
3
当你获取队列时,你能容忍锁定队列,然后在发送更新到地图以清空(k,v)对的同时保持队列不变吗?也就是说,使用队列上的锁而不是整个地图上的锁来避免冲突?并且添加处理情况的逻辑,即在获取队列之前,队列已被清空并从地图中删除——你只需要在获取锁时检查它是否非空。如果为空,则假定它已被删除。 - Rex Kerr
好主意 Rex,我会尝试这种方法,看看是否奏效。 - Viktor Klang
2
对于那些寻找 Viktor 实现的人,你可以在 akka.util.ConcurrentMultiMap 中找到它。非常感谢,Viktor! - Jean-Philippe Pellet

7

您尝试过谷歌收藏吗?它们具有各种Multimap实现。


7
是的,但我不仅仅是要一个 multimap,我需要一个高性能的 并发 multimap。今天早些时候,我检查了他们的源代码,他们的并发 multimap 锁定了整个 map,这创建了一个串行结构。 - Viktor Klang
1
你对于Syncronized的实现是正确的 - 它会锁住整个实例 - 你确定这个集合是性能瓶颈吗? - Jon Freedman
2
是的,这就是最核心的部分。 - Viktor Klang

3

2

我有一个需求,需要使用Map<Comparable, Set<Comparable>>,在Map和相应的Set上进行插入操作时必须是并发的,但是一旦从Map中消耗了一个Key,它就必须被删除。可以将其想象为每两秒运行一次的作业,该作业正在消耗来自特定Key的整个Set<Comparable>,但插入操作是完全并发的,因此大多数值在作业启动时会被缓冲,以下是我的实现方式:

注意:我使用Guava的帮助类Maps创建并发Maps,此解决方案还模拟了Java Concurrency in Practice Listing 5.19

import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets;

import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

/**
 * A general purpose Multimap implementation for delayed processing and concurrent insertion/deletes.
 *
 * @param <K> A comparable Key
 * @param <V> A comparable Value
 */
public class ConcurrentMultiMap<K extends Comparable, V extends Comparable>
{
  private final int size;
  private final ConcurrentMap<K, Set<V>> cache;
  private final ConcurrentMap<K, Object> locks;

  public ConcurrentMultiMap()
  {
    this(32, 2);
  }

  public ConcurrentMultiMap(final int concurrencyLevel)
  {
    this(concurrencyLevel, 2);
  }

  public ConcurrentMultiMap(final int concurrencyLevel, final int factor)
  {
    size=concurrencyLevel * factor;
    cache=new MapMaker().concurrencyLevel(concurrencyLevel).initialCapacity(concurrencyLevel).makeMap();
    locks=new MapMaker().concurrencyLevel(concurrencyLevel).initialCapacity(concurrencyLevel).weakKeys().weakValues().makeMap();
  }

  private Object getLock(final K key){
    final Object object=new Object();
    Object lock=locks.putIfAbsent(key, object);
    if(lock == null){
      lock=object;
    }
    return lock;
  }

  public void put(final K key, final V value)
  {
    synchronized(getLock(key)){
      Set<V> set=cache.get(key);
      if(set == null){
        set=Sets.newHashSetWithExpectedSize(size);
        cache.put(key, set);
      }
      set.add(value);
    }
  }

  public void putAll(final K key, final Collection<V> values)
  {
    synchronized(getLock(key)){
      Set<V> set=cache.get(key);
      if(set == null){
        set=Sets.newHashSetWithExpectedSize(size);
        cache.put(key, set);
      }
      set.addAll(values);
    }
  }

  public Set<V> remove(final K key)
  {
    synchronized(getLock(key)){
      return cache.remove(key);
    }
  }

  public Set<K> getKeySet()
  {
    return cache.keySet();
  }

  public int size()
  {
    return cache.size();
  }

}

2

我创建了一个 ConcurrentMultiMap mixin,它扩展了 mutable.MultiMap mixin 并具有 concurrent.Map[A, Set[B]] 自身类型。 它按键锁定,具有 O(n) 空间复杂度,但如果您写入不是特别频繁,其时间复杂度相当不错。


1

Aleks下周要来办公室看我,我会在那时和他谈话。 - Viktor Klang
我很好奇想听听这次演讲的情况。你觉得 Ctries 有用吗? - Shlomi
不是为了multimap的目的,我和Bagwell以及Prokopec讨论过制作一个可以作为multimap的实现,但我认为时间不够。Ctries将会进入Scala 2.10。 - Viktor Klang

0
使用Gauava的MultiMaps。 Multimaps.synchronizedMultimap(HashMultimap.create())

1
你的解决方案是“同步的”。请注意一下原始问题提出者所寻找的。 - Stefan Haberl

0

讨论可能已经有些晚了,但是...

在高性能并发编程方面,一个人应该准备好编写解决方案。在使用Concurrent时,“细节决定成败”的声明具有完整的意义。可以实现完全并发和无锁结构。

起点将是非阻塞HashTable http://sourceforge.net/projects/high-scale-lib/,然后根据每个键的值数量以及需要多频繁添加/删除一些写入对象[]来确定是否需要基于数组的带信号量/自旋锁集合。


0

我对这个话题有点晚了,但我认为现在你可以像这样使用Guava:

Multimaps.newSetMultimap(new ConcurrentHashMap<>(), ConcurrentHashMap::newKeySet)

3
Multimaps.newSetMultimap(...)支持并发更新吗?根据Javadoc,"如果有任何并发操作更新多重映射,则该映射在这种情况下是不线程安全的,即使映射和工厂生成的实例也是如此。"请参考:http://google.github.io/guava/releases/23.0/api/docs/com/google/common/collect/Multimaps.html#newSetMultimap-java.util.Map-com.google.common.base.Supplier- - breandan
通过阅读文档和之前的答案,这个代码会起作用吗?:Multimaps.synchronizedSetMultimap(Multimaps.newSetMultimap(new ConcurrentHashMap<>(), ConcurrentHashMap::newKeySet)); - Radu Toader

-1

你看过Javalution吗?它是专为实时等高性能而设计的。


1
找不到multimap,更别说并发和高性能的了 :( - Viktor Klang

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接