Java并发锁在Map键级别上

12

有些编写者通过调用putPrice方法来更新价格。读取器使用getPrice获取最新的价格。hasChangedMethod返回一个布尔值,标识自从上次调用getPrice以来价格是否发生了变化。

我正在寻找最快的解决方案。我试图实现针对键级别的线程安全一致的读写映射表。

我认为锁定整个映射表可能会导致性能问题,所以我决定在键级别上进行。不幸的是,这并没有像预期的那样工作,并且阻塞了整个映射表。为什么?你能帮我弄清楚我在这里做错了什么吗?

更新:

我想我们可以总结为两个问题:1. 如果某个键正在更新过程中,如何提供对其余键的自由访问。2. 如何确保我的方法具有原子操作,因为它们需要执行多个读/写操作。例如:getPrice() - 获取价格并更新hasChanged标志。

PriceHolder.java

public final class PriceHolder {

    private ConcurrentMap<String, Price> prices;

    public PriceHolder() {
        this.prices = new ConcurrentHashMap<>();

        //Receive starting prices..
        Price EUR = new Price();
        EUR.setHasChangedSinceLastRead(true);
        EUR.setPrice(new BigDecimal(0));

        Price USD = new Price();
        USD.setHasChangedSinceLastRead(true);
        USD.setPrice(new BigDecimal(0));
        this.prices.put("EUR", EUR);
        this.prices.put("USD", USD);

    }

    /** Called when a price ‘p’ is received for an entity ‘e’ */
    public void putPrice(
            String e,
            BigDecimal p) throws InterruptedException {

            synchronized (prices.get(e)) {
                Price currentPrice = prices.get(e);
                if (currentPrice != null && !currentPrice.getPrice().equals(p)) {
                    currentPrice.setHasChangedSinceLastRead(true);
                    currentPrice.setPrice(p);
                } else {
                    Price newPrice = new Price();
                    newPrice.setHasChangedSinceLastRead(true);
                    newPrice.setPrice(p);
                    prices.put(e, newPrice);
                }
            }
    }

    /** Called to get the latest price for entity ‘e’ */
    public BigDecimal getPrice(String e) {
        Price currentPrice = prices.get(e);
        if(currentPrice != null){
            synchronized (prices.get(e)){
                currentPrice.setHasChangedSinceLastRead(false);
                prices.put(e, currentPrice);
            }
            return currentPrice.getPrice();
        }
        return null;
    }

    /**
     * Called to determine if the price for entity ‘e’ has
     * changed since the last call to getPrice(e).
     */
    public boolean hasPriceChanged(String e) {
        synchronized (prices.get(e)){
            return prices.get(e) != null ? prices.get(e).isHasChangedSinceLastRead() : false;
        }
    }
}

Price.java

public class Price {

    private BigDecimal price;

    public boolean isHasChangedSinceLastRead() {
        return hasChangedSinceLastRead;
    }

    public void setHasChangedSinceLastRead(boolean hasChangedSinceLastRead) {
        this.hasChangedSinceLastRead = hasChangedSinceLastRead;
    }

    public BigDecimal getPrice() {
        return price;
    }

    public void setPrice(BigDecimal price) {
        this.price = price;
    }

    private boolean hasChangedSinceLastRead = false;

}

1
由于您事先知道键,因此无需使用ConcurrentMap。只需要一个简单的Map即可。在初始化程序时,主线程可以使用默认值填充Map中的这些键和Price对象。您所需要做的就是获取Price对象并对每个get和put操作进行同步。在同步块中不需要使用Thread.sleep(3000);调用,因为在这样的关键路径中等待线程没有意义。 - izce
getPrice(...)方法中,您不需要这行代码prices.put(e, currentPrice);,因为您已经在currentPrice变量中引用了相同的Price对象。 - izce
@Shashank,但是getPrice()中的逻辑怎么样?我更新了一个hasChangedSilnceLastRead属性。例如,如果线程1读取了priceA,线程2修改了priceA,然后线程1再次覆盖了上一次 - 不一致性? - Wild Goat
你的问题不够清晰。我不明白为什么整个地图会被阻塞。然而,我建议你查看http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReadWriteLock.html。它可能可以实现你想要的功能:允许读者在潜在缓慢的写入者争用锁时继续进行。但是你仍然需要保持写入的关键部分尽可能小。 - Alexander Torstling
6
另一个建议:如果你正在编写多线程代码,不要从地图上多次阅读并期望每次都得到相同的结果。 - Alexander Torstling
显示剩余10条评论
7个回答

3

ConcurrentMap 的使用严重依赖于 Java 版本。当您使用 Java 8 或更高版本时,几乎所有功能都是免费的:

public final class PriceHolder {

    private ConcurrentMap<String, Price> prices;

    public PriceHolder() {
        this.prices = new ConcurrentHashMap<>();

        //Receive starting prices..
        Price EUR = new Price();
        EUR.setHasChangedSinceLastRead(true);
        EUR.setPrice(BigDecimal.ZERO);

        Price USD = new Price();
        USD.setHasChangedSinceLastRead(true);
        USD.setPrice(BigDecimal.ZERO);
        this.prices.put("EUR", EUR);
        this.prices.put("USD", USD);

    }

    /** Called when a price ‘p’ is received for an entity ‘e’ */
    public void putPrice(String e, BigDecimal p) {
        prices.compute(e, (k, price)-> {
            if(price==null) price=new Price();
            price.setHasChangedSinceLastRead(true);
            price.setPrice(p);
            return price;
        });
    }

    /** Called to get the latest price for entity ‘e’ */
    public BigDecimal getPrice(String e) {
        Price price = prices.computeIfPresent(e, (key, value) -> {
            value.setHasChangedSinceLastRead(false);
            return value;
        });
        return price==null? null: price.getPrice();
    }

    /**
     * Called to determine if the price for entity ‘e’ has
     * changed since the last call to getPrice(e).
     */
    public boolean hasPriceChanged(String e) {
        final Price price = prices.get(e);
        return price!=null && price.isHasChangedSinceLastRead();
    }
}

并发映射表上的compute…方法在计算期间锁定受影响的条目,同时允许所有其他条目的更新进行。对于像hasPriceChanged这样的简单get访问,只要您在方法中仅调用一次,即将结果保留在本地变量中进行检查,就不需要额外的同步。


在Java 8之前,情况更加复杂。在那里,所有ConcurrentMap提供的都是某些原子更新方法,可以使用它们来以尝试和重复的方式构建更高级别的更新方法。

为了清晰地使用它,最好使值类不可变:

public final class Price {

    private final BigDecimal price;
    private final boolean hasChangedSinceLastRead;

    Price(BigDecimal value, boolean changed) {
      price=value;
      hasChangedSinceLastRead=changed;
    }
    public boolean isHasChangedSinceLastRead() {
        return hasChangedSinceLastRead;
    }
    public BigDecimal getPrice() {
        return price;
    }
}

然后使用它来始终构建一个反映所需新状态的新对象,并使用putIfAbsentreplace执行原子更新:

public final class PriceHolder {

    private ConcurrentMap<String, Price> prices;

    public PriceHolder() {
        this.prices = new ConcurrentHashMap<>();

        //Receive starting prices..
        Price EUR = new Price(BigDecimal.ZERO, true);
        Price USD = EUR; // we can re-use immutable objects...
        this.prices.put("EUR", EUR);
        this.prices.put("USD", USD);
    }

    /** Called when a price ‘p’ is received for an entity ‘e’ */
    public void putPrice(String e, BigDecimal p) {
      Price old, _new=new Price(p, true);
      do old=prices.get(e);
      while(old==null? prices.putIfAbsent(e,_new)!=null: !prices.replace(e,old,_new));
    }

    /** Called to get the latest price for entity ‘e’ */
    public BigDecimal getPrice(String e) {
        for(;;) {
          Price price = prices.get(e);
          if(price==null) return null;
          if(!price.isHasChangedSinceLastRead()
          || prices.replace(e, price, new Price(price.getPrice(), false)))
            return price.getPrice();
        }
    }

    /**
     * Called to determine if the price for entity ‘e’ has
     * changed since the last call to getPrice(e).
     */
    public boolean hasPriceChanged(String e) {
        final Price price = prices.get(e);
        return price!=null && price.isHasChangedSinceLastRead();
    }
}

请注意,计算会产生写入操作的性能开销(60M vs 1B ops/s)。你可能想使用双重检查锁定模式来在通常情况下优化读取性能。 - Ben Manes
1
@Ben Manes:compute 一个写操作。即使它返回现有对象,它也会修改两个属性。因此,如果没有写语义,就需要额外的锁定。如果您假设某个操作不需要执行更新的情况比偶尔发生得更多,则乐观读取将非常有用,但问题并未给出这一点。 - Holger
抱歉,我的意思是在一般情况下进行计算。具体来说,我指的是getPrice使用computeIfPresent。由于问题的核心是性能问题,因此值得指出。 - Ben Manes
1
@Ben Manes:但是即使 getPrice 函数包含写操作,你是正确的,根据实际用例,还有其他替代方案。然而,问题是如何避免锁定整张地图,而这是解决此问题的一种非常简单的方法,只有在性能真正成为问题的情况下,我才会将其变得更加复杂... - Holger

1
如何像这样:
class AtomicPriceHolder {

  private volatile BigDecimal value;
  private volatile boolean dirtyFlag;

  public AtomicPriceHolder( BigDecimal initialValue) {
    this.value = initialValue;
    this.dirtyFlag = true;
  }

  public synchronized void updatePrice( BigDecimal newPrice ) {
    if ( this.value.equals( newPrice ) == false) {
      this.value = newPrice;
      this.dirtyFlag = true;
    }
  }

  public boolean isDirty() {
    return this.dirtyFlag;
  }

  public BigDecimal peek() {
    return this.value;
  }

  public synchronized BigDecimal read() {
    this.dirtyFlag = false;
    return this.value;
  }

}

...

public void updatePrice( String id, BigDecimal value ) {

  AtomicPriceHolder holder;
  synchronized( someGlobalSyncObject ) {
    holder = prices.get(id);
    if ( holder == null ) {
      prices.put( id, new AtomicPriceHolder( value ) );
      return;
    }
  }

  holder.updatePrice( value );

}

请注意,以这种方式进行实际价格值的原子修改非常快,因此您不能指望在解锁地图之前获得任何收益。

条件操作“检查是否在地图中,如果不是,则创建一个新操作并插入”必须是原子性的,并且应通过在短时间内锁定整个映射来完成。否则将需要为每个键创建专用同步对象。这些对象必须存储和管理在某个地方,对该存储区的访问也必须再次进行同步等等。

只需执行粗粒度锁定以确保正确性,然后继续进行即可。


1

hasChangedMethod 返回一个布尔值,用于标识自上次调用 getPrice 以来价格是否已更改。

这是一个有问题的模式,因为 hasPriceChanged 实际上需要根据每个线程返回不同的结果。如果您可以更详细地说明您实际上要做什么(即为什么您认为需要这种模式),可能可以提供另一种选择。例如,考虑完全放弃 hasPriceChanged,并简单地将此数据结构视为规范,并在每次查询时查询其当前值。


话虽如此,这是我可能实现您所寻求的行为的方法。可能会有其他选择,这只是第一步。

保持一个ConcurrentHashMap<String, ThreadLocal<Boolean>>ThreadLocal将按线程存储get调用的状态。我还使用了一个单独的私有锁映射。

ConcurrentHashMap<String, Price> pricesMap;
ConcurrentHashMap<String, ThreadLocal<Boolean>> seenMap;
ConcurrentHashMap<String, Object> lockMap;

private Object getLock(String key) {
  return lockMap.computeIfAbsent(key, k -> new Object());
}

private ThreadLocal<Boolean> getSeen(String key) {
  return seenMap.computeIfAbsent(e,
      ThreadLocal.<Boolean>withInitial(() -> false));
}

public void putPrice(String e, BigDecimal p) {
  synchronized (getLock(e)) {
    // price has changed, clear the old state to mark all threads unseen
    seenMap.remove(e);
    pricesMap.get(e).setPrice(p);
  }
}

public BigDecimal getPrice(String e) {
  synchronized (getLock(e)) {
    // marks the price seen for this thread
    getSeen(e).set(true);
    BigDecimal price = pricesMap.get(e);
    return price != null ? price.getPrice() : null;
  }
}

public boolean hasPriceChanged(String e) {
  synchronized (getLock(e)) {
    return !getSeen(e).get();
  }
}

请注意,虽然数据结构是线程安全的,但这里仍存在竞态条件的风险 - 您可能调用hasPriceChanged()并返回false,紧接着另一个线程更改了价格。消除hasPriceChanged()行为可能会简化您的代码。

太好了,让我试试!顺便说一下,在getSeen()方法中,lambda返回的类型存在一些问题。 - Wild Goat
抱歉,我刚刚是内联编写的,所以可能会有一些小错别字。如果需要的话,您可以始终按Java 7风格编写它。请随意编辑此帖子以进行更改。 - dimo414

1
  1. 如果在更新过程中,如何提供免费访问其余键的方法。

只需使用ConcurrentHashMap即可确保免费访问键;get不会引入任何争用,而put仅锁定子集而不是整个映射。

  1. 由于我的方法需要多个读/写操作,如何保证原子操作?

为确保一致性,您需要在某些共享对象上同步(或使用另一种锁定机制,例如ReentrantLock);我建议创建一个锁对象的ConcurrentHashMap<String, Object>,以便您可以执行以下操作:

synchronized (locks.get(e)) { ... }

只需使用new Object()填充地图。您使用的模式存在风险(在Price对象上锁定),现在这些对象必须持久存在且永远不会被替换。通过拥有专用的私有锁集合来实施比重载值类型作为锁机制更容易。
作为一种附带说明,如果你想在Java中进行货币操作,你应该绝对使用Joda-Money库,而不是重复造轮子。

0

我认为你可以只在特定的键上放置锁。

import java.util.HashMap;

public class CustomHashmap<K, V> extends HashMap<K, V>{

    private static final long serialVersionUID = 1L;

    @Override
    public V put(K key, V value) {
        V val = null;
        synchronized (key) {
            val = super.put(key, value);
        }

        return val;
    }

    @Override
    public V get(Object key) {
        return super.get(key);
    }


}

0

我建议使用可观察者-观察者模式。没必要重复造轮子。请参考Observer和Observable。

我还建议研究一下Condition,因为不需要为所有读取操作锁定整个对象。读取可以并发进行,但写入不能。

如果一个集合是并发的,并不意味着它会自动同步所有内容。它只保证它们的方法是线程安全的。一旦离开函数范围,锁就会释放。因为你需要更高级的同步控制方式,最好自己动手使用普通的HashMap。

一些注意事项:

你过度使用了HashMap.get。考虑获取一次并将其存储在变量中。

synchronized (prices.get(e))

这可能会返回 null,你应该进行检查。在 null 对象上同步是不允许的。

prices.put(e, currentPrice);

我不确定这是否是有意的,但这个操作是不必要的。请参见this


这可能会返回null,你应该检查它。这可能比看起来更难。您无法原子地检查对象是否为null并在其上进行同步。 - JimmyB

0

获得所需结果的最佳方法可能是使用ConcurrentMap来实现map,并将所有其他同步操作放在Price类中。这将导致更简单的代码,在多线程环境中避免微妙的错误,同时实现对每种货币的映射表的同时访问和跟踪是否自上次读取以来有写入。

在Price类中,每当设置价格时,还需要设置hasChangedSinceLastRead;这两个操作应该作为一个原子操作一起执行。每当读取价格时,还要清除hasChangedSinceLastRead;这也应该是一个原子操作。因此,该类只允许这两个操作修改hasChangedSinceLastRead,而不是将逻辑留给其他类,而且这些方法应该是同步的,以确保由多个线程访问时,pricehasChangedSinceLastRead不会失去同步。现在,该类应该看起来像这样:

public class Price {

    public boolean isHasChangedSinceLastRead() {
        return hasChangedSinceLastRead;
    }

    // setHasChangedSinceLastRead() removed

    public synchronized BigDecimal getPrice() {
        hasChangedSinceLastRead = false;
        return price;
    }

    public synchronized void setPrice(BigDecimal newPrice) {
        if (null != price && price.equals(newPrice) {
            return;
        }
        price = newPrice;
        hasChangedSinceLastRead = true;
    }

    private BigDecimal price;
    private volatile boolean hasChangedSinceLastRead = false;
}

请注意,您可以将isHasChangedSinceLastRead()设置为同步,或者将hasChangedSinceLastRead设置为volatile。我选择了后者,让isHasChangedSinceLastRead()不同步,因为同步该方法需要完整的内存屏障,而使变量易失性只需要在读取变量时进行内存屏障的读半部分。
之所以读取hasChangedSinceLastRead需要某种内存屏障,是因为同步方法、块和易失性访问仅保证与其他同步方法、块和易失性访问的有效执行顺序-“发生在”关系。如果isHasChangedSinceLastRead未同步且变量不易失性,则不存在“发生在”关系;在这种情况下,isHasChangedSinceLastRead可能返回“true”,但调用它的线程可能看不到价格的变化。这是因为如果未建立“发生在”关系,则在setPrice()中设置price和hasChangedSinceLastRead的顺序可能被其他线程反向看到。
现在,所有必要的同步都在ConcurrentMap和Price类中,您不再需要在PriceHolder中进行任何同步,PriceHolder也不再需要担心更新hasChangedSinceLastRead。代码简化为:
public final class PriceHolder {

    private ConcurrentMap<String, Price> prices;

    public PriceHolder() {
        prices = new ConcurrentHashMap<>();

        //Receive starting prices..
        Price EUR = new Price();
        EUR.setPrice(new BigDecimal(0));
        this.prices.put("EUR", EUR);

        Price USD = new Price();
        USD.setPrice(new BigDecimal(0));
        this.prices.put("USD", USD);
    }

    /** Called when a price ‘p’ is received for an entity ‘e’ */
    public void putPrice(
        String e,
        BigDecimal p
    ) throws InterruptedException {
        Price currentPrice = prices.get(e);
        if (null == currentPrice) {
            currentPrice = new Price();
            currentPrice = prices.putIfAbsent(e);
        }
        currentPrice.setPrice(p);
    }

    /** Called to get the latest price for entity ‘e’ */
    public BigDecimal getPrice(String e) {
        Price currentPrice = prices.get(e);
        if (currentPrice != null){
            return currentPrice.getPrice();
        }
        return null;
    }

    /**
     * Called to determine if the price for entity ‘e’ has
     * changed since the last call to getPrice(e).
     */
    public boolean hasPriceChanged(String e) {
        Price currentPrice = prices.get(e);
        return null != currentPrice ? currentPrice.isHasChangedSinceLastRead() : false;
    }
}

哇!非常简单且干净。你能否再解释一下为什么我们需要在 hasChangedSinceLastRead 上使用 volatile 关键字吗?我认为如果我们在方法上加入 synchronized,它会锁定整个对象。 - Wild Goat
添加了一段解释为什么 isHasChangedSinceLastRead() 必须同步或 hasChangedSinceLastRead 必须是易失性的。两者都可以,但我们需要其中之一。我选择了后者,因为它具有更少的执行开销。在编辑后的答案中有更多细节。 - Warren Dew

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接