使用可重入锁实现阻塞并发

3
我正在尝试使用RentrantLock实现一个类来强制执行我的Java应用程序中的并发性,阻止异步修改给定实体实例(由关键字标识)。其目标是阻塞/排队多个并发修改给定对象实例的尝试,直到先前的线程完成。该类以通用方式实现了这一点,允许任何代码块获取锁并在完成后释放它(与RentrantLock语义相同),同时增加了仅阻塞试图修改对象的同一实例的线程(根据关键字标识)而不是阻塞进入代码块的所有线程的效用。
该类提供了一个简单的结构,只允许为一个实体实例同步一个代码块。例如,如果我希望代码块对来自ID为33的用户的所有线程进行同步,但我不希望来自任何其他用户的线程被服务用户33的线程阻塞。
该类实现如下:
public class EntitySynchronizer {
  private static final int DEFAULT_MAXIMUM_LOCK_DURATION_SECONDS = 300; // 5 minutes
  private Object mutex = new Object();
  private ConcurrentHashMap<Object, ReentrantLock> locks = new ConcurrentHashMap<Object, ReentrantLock>();
  private static ThreadLocal<Object> keyThreadLocal = new ThreadLocal<Object>();
  private int maximumLockDurationSeconds;
  public EntitySynchronizer() {
    this(DEFAULT_MAXIMUM_LOCK_DURATION_SECONDS);
  }
  public EntitySynchronizer(int maximumLockDurationSeconds) {
    this.maximumLockDurationSeconds = maximumLockDurationSeconds;
  }
  /**
   * Initiate a lock for all threads with this key value
   * @param key the instance identifier for concurrency synchronization
   */
  public void lock(Object key) {
    if (key == null) {
      return;
    }
    /*
     * returns the existing lock for specified key, or null if there was no existing lock for the
     * key
     */
    ReentrantLock lock;
    synchronized (mutex) {
      lock = locks.putIfAbsent(key, new ReentrantLock(true));
      if (lock == null) {
        lock = locks.get(key);
      }
    }
    /*
     * Acquires the lock and returns immediately with the value true if it is not held by another
     * thread within the given waiting time and the current thread has not been interrupted. If this
     * lock has been set to use a fair ordering policy then an available lock will NOT be acquired
     * if any other threads are waiting for the lock. If the current thread already holds this lock
     * then the hold count is incremented by one and the method returns true. If the lock is held by
     * another thread then the current thread becomes disabled for thread scheduling purposes and
     * lies dormant until one of three things happens: - The lock is acquired by the current thread;
     * or - Some other thread interrupts the current thread; or - The specified waiting time elapses
     */
    try {
      /*
       * using tryLock(timeout) instead of lock() to prevent deadlock situation in case acquired
       * lock is not released normalRelease will be false if the lock was released because the
       * timeout expired
       */
      boolean normalRelease = lock.tryLock(maximumLockDurationSeconds, TimeUnit.SECONDS);
      /*
       * lock was release because timeout expired. We do not want to proceed, we should throw a
       * concurrency exception for waiting thread
       */
      if (!normalRelease) {
        throw new ConcurrentModificationException(
            "Entity synchronization concurrency lock timeout expired for item key: " + key);
      }
    } catch (InterruptedException e) {
      throw new IllegalStateException("Entity synchronization interrupted exception for item key: "
          + key);
    }
    keyThreadLocal.set(key);
  }
  /**
   * Unlock this thread's lock. This takes care of preserving the lock for any waiting threads with
   * the same entity key
   */
  public void unlock() {
    Object key = keyThreadLocal.get();
    keyThreadLocal.remove();
    if (key != null) {
      ReentrantLock lock = locks.get(key);
      if (lock != null) {
        try {
          synchronized (mutex) {
            if (!lock.hasQueuedThreads()) {
              locks.remove(key);
            }
          }
        } finally {
          lock.unlock();
        }
      } else {
        synchronized (mutex) {
          locks.remove(key);
        }
      }
    }
  }
}

这个类的使用方法如下:

private EntitySynchronizer entitySynchronizer = new EntitySynchronizer();
entitySynchronizer.lock(userId);  // 'user' is the entity by which i want to differentiate my synchronization
try {
  //execute my code here ...
} finally {
  entitySynchronizer.unlock();
}

问题在于它不能完美地工作。在非常高的并发负载下,仍然存在一些情况,其中具有相同“key”的多个线程没有被同步。我已经进行了彻底的测试,但无法弄清楚为什么/在哪里会出现这种情况。
有任何并发专家吗?

你是不是在所有访问中都使用了同一个entitySynchronizer实例? - assylias
3个回答

4
您需要修复的其中一项问题是这个:
ReentrantLock lock;
synchronized (mutex) {
  lock = locks.putIfAbsent(key, new ReentrantLock(true));
  if (lock == null) {
    lock = locks.get(key);
  }
}

这样做完全没有体现出并发映射的意义。为什么不像这样写呢:
ReentrantLock lock = new ReentrantLock(true);
final ReentrantLock oldLock = locks.putIfAbsent(key, lock);
lock = oldLock != null? oldLock : lock;

1
这并不完全正确,因为即使在映射中已经存在该键的锁,您仍将获得锁的新实例。海报真正想要的是lock = locks.putIfAbsent(key, new ReentrantLock(true)); 请参见(http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ConcurrentHashMap.html#putIfAbsent%28K,%20V%29) - Malcolm Smith
马尔科姆说得对,我这样做是因为如果存在锁,我需要重用现有的锁。马尔科,你是否想知道我为什么要在Map周围同步操作?我这样做是因为我怀疑在解锁(方法)的锁管理代码和创建/获取现有锁之间可能存在竞争条件。@马尔科姆 - 那么在您的代码片段中,只有在没有现有锁时才会调用new()操作? - Strykker
@MalcolmSmith 不,OP不想要你的建议。请看我编辑后的答案,那才是我的意思。我认为,OP可以重写代码,使所有对并发映射的操作都是原子的。顺便说一句,Malcolm和我都无条件地实例化了锁,但如果你立即处理它,这并不是问题。如果你想要原子性的putIfAbsent,你必须这样做。 - Marko Topolnik

2
您的解决方案缺乏原子性。请考虑以下情景:
  • 线程A进入lock()并从映射中获取现有锁。
  • 线程B进入相同键的unlock(),解锁并从映射中删除锁(因为线程A尚未调用tryLock())。
  • 线程A成功调用tryLock()
一种可能的选择是跟踪从映射中“签出”的锁数量:
public class EntitySynchronizer {
    private Map<Object, Token> tokens = new HashMap<Object, Token>();
    private ThreadLocal<Token> currentToken = new ThreadLocal<Token>();
    private Object mutex = new Object();

    ...

    public void lock(Object key) throws InterruptedException {
        Token token = checkOut(key);
        boolean locked = false;
        try {
            locked = token.lock.tryLock(maximumLockDurationSeconds, TimeUnit.SECONDS));
            if (locked) currentToken.set(token);
        } finally {
            if (!locked) checkIn(token);
        }
    }

    public void unlock() {
        Token token = currentToken.get();
        if (token != null) {
            token.lock.unlock();
            checkIn(token);
            currentToken.remove();
        }
    }

    private Token checkOut(Object key) {
        synchronized (mutex) {
            Token token = tokens.get(key);
            if (token != null) {
                token.checkedOutCount++;
            } else {
                token = new Token(key); 
                tokens.put(key, token);
            }
            return token;
        }
    }

    private void checkIn(Token token) {
        synchronized (mutex) {
            token.checkedOutCount--;
            if (token.checkedOutCount == 0)
                tokens.remove(token.key);
        }
    }

    private class Token {
        public final Object key;
        public final ReentrantLock lock = new ReentrantLock();
        public int checkedOutCount = 1;

        public Token(Object key) {
            this.key = key;
        }
    }
}

请注意,tokens不必是ConcurrentHashMap,因为它的方法会在同步块中调用。

1
д»–еҸҜд»ҘеҸӘдҪҝз”Ёsynchronized(key)пјҢдҪҶеҸҜиғҪд»–еёҢжңӣжүҖжңүзӯүдәҺequalзҡ„й”®йғҪжҳҜдә’ж–Ҙзҡ„гҖӮ - Marko Topolnik
@axtavt - 在 lock() 方法中,如果 tryLock() 返回 false(由于锁定持续时间超时),那么使用它的代码块仍将被执行正确(checkIn() 将在 finally{} 块中调用),但 unlock() 方法(因此 checkIn())将在代码块结束时再次被调用。另外,我是否可以通过在 synchronized 块中包含我的 tryLock() 来实现你在这里所做的事情? - Strykker
@Strykker:如果tryLock返回false,那么token将不会被放入ThreadLocal中,因此随后的unlock()也将无事发生。如果您愿意,可以为这些情况实现不同的处理方式。您不能将tryLock()放入同步块中,因为它会阻塞其他线程。 - axtavt
@axtavt - 你说得对。我没有看到那个。谢谢,这太好了!你介意我多少使用你发布的内容吗?(抱歉,我还没有足够的声望来投票支持你的答案:() - Strykker
@Strykker:是的,随意使用它。 - axtavt

1

我假设你并没有真正按照以下方式使用你的类:

private EntitySynchronizer entitySynchronizer = new EntitySynchronizer();
entitySynchronizer.lock(userId);  // 'user' is the entity by which i want to differentiate my synchronization
try {
  //execute my code here ...
} finally {
  entitySynchronizer.unlock();
}

但是是否拥有一个实体同步器的单例实例?否则那就是你的问题所在。

没错,我确实有一个EntitySynchronizer的单例实例。我的使用示例有点过于简化了。 - Strykker

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接