Java并发编程:共享访问的配对锁

9
我正在寻找一个Java实现的并发语义,类似于ReadWriteLock但是对称的。具体而言,它有两个锁,分别叫做A和B。锁A是共享的,锁B也是共享的。如果任何线程持有锁A,则没有线程可以获取锁B;如果任何线程持有锁B,则没有线程可以获取锁A。目前我已经使用ReadWriteLock近似达到了期望的功能,但感觉像是一个hack,并且在高负载下可能会影响程序性能。是否有现有的库类可以实现这一点?

这里有几个答案,但没有一个涉及公平性。锁的一侧饥饿可能是一个严重的问题,特别是如果其中一个组比另一个组运行得更频繁(这似乎是您的情况)。如果总是有一个_A_在运行,那么_B_永远不会运行。 - teppic
4个回答

3

简短回答:

在标准库中,没有像您需要的那样的东西。

详细回答:

要轻松实现自定义Lock,您应该子类化或委托给AbstractQueuedSynchronizer

以下代码是一个非公平锁的示例,它实现了您需要的内容,包括一些(非穷尽)测试。我称之为LeftRightLock,因为您的要求具有二进制特性。

概念非常简单:

AbstractQueuedSynchronizer公开了使用比较和交换习语(compareAndSetState(int expect, int update))原子地设置int状态的方法,我们可以使用暴露的状态来保持持有锁的线程计数,在右锁被持有的情况下将其设置为正值,在左锁被持有的情况下将其设置为负值。

然后我们只需要确保以下条件: - 仅当内部AbstractQueuedSynchronizer的状态为或负数时,您才能锁定Left - 仅当内部AbstractQueuedSynchronizer的状态为或正数时,您才能锁定Right

LeftRightLock.java


import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Lock;

/**
 * A binary mutex with the following properties:
 *
 * Exposes two different {@link Lock}s: LEFT, RIGHT.
 *
 * When LEFT is held other threads can acquire LEFT but thread trying to acquire RIGHT will be
 * blocked. When RIGHT is held other threads can acquire RIGHT but thread trying to acquire LEFT
 * will be blocked.
 */
public class LeftRightLock {

    public static final int ACQUISITION_FAILED = -1;
    public static final int ACQUISITION_SUCCEEDED = 1;

    private final LeftRightSync sync = new LeftRightSync();

    public void lockLeft() {
        sync.acquireShared(LockSide.LEFT.getV());
    }

    public void lockRight() {
        sync.acquireShared(LockSide.RIGHT.getV());
    }

    public void releaseLeft() {
        sync.releaseShared(LockSide.LEFT.getV());
    }

    public void releaseRight() {
        sync.releaseShared(LockSide.RIGHT.getV());
    }

    public boolean tryLockLeft() {
        return sync.tryAcquireShared(LockSide.LEFT) == ACQUISITION_SUCCEEDED;
    }

    public boolean tryLockRight() {
        return sync.tryAcquireShared(LockSide.RIGHT) == ACQUISITION_SUCCEEDED;
    }

    private enum LockSide {
        LEFT(-1), NONE(0), RIGHT(1);

        private final int v;

        LockSide(int v) {
            this.v = v;
        }

        public int getV() {
            return v;
        }
    }

    /**
     * <p>
     * Keep count the count of threads holding either the LEFT or the RIGHT lock.
     * </p>
     *
     * <li>A state ({@link AbstractQueuedSynchronizer#getState()}) greater than 0 means one or more threads are holding RIGHT lock. </li>
     * <li>A state ({@link AbstractQueuedSynchronizer#getState()}) lower than 0 means one or more threads are holding LEFT lock.</li>
     * <li>A state ({@link AbstractQueuedSynchronizer#getState()}) equal to zero means no thread is holding any lock.</li>
     */
    private static final class LeftRightSync extends AbstractQueuedSynchronizer {


        @Override
        protected int tryAcquireShared(int requiredSide) {
            return (tryChangeThreadCountHoldingCurrentLock(requiredSide, ChangeType.ADD) ? ACQUISITION_SUCCEEDED : ACQUISITION_FAILED);
        }    

        @Override
        protected boolean tryReleaseShared(int requiredSide) {
            return tryChangeThreadCountHoldingCurrentLock(requiredSide, ChangeType.REMOVE);
        }

        public boolean tryChangeThreadCountHoldingCurrentLock(int requiredSide, ChangeType changeType) {
            if (requiredSide != 1 && requiredSide != -1)
                throw new AssertionError("You can either lock LEFT or RIGHT (-1 or +1)");

            int curState;
            int newState;
            do {
                curState = this.getState();
                if (!sameSide(curState, requiredSide)) {
                    return false;
                }

                if (changeType == ChangeType.ADD) {
                    newState = curState + requiredSide;
                } else {
                    newState = curState - requiredSide;
                }
                //TODO: protect against int overflow (hopefully you won't have so many threads)
            } while (!this.compareAndSetState(curState, newState));
            return true;
        }    

        final int tryAcquireShared(LockSide lockSide) {
            return this.tryAcquireShared(lockSide.getV());
        }

        final boolean tryReleaseShared(LockSide lockSide) {
            return this.tryReleaseShared(lockSide.getV());
        }

        private boolean sameSide(int curState, int requiredSide) {
            return curState == 0 || sameSign(curState, requiredSide);
        }

        private boolean sameSign(int a, int b) {
            return (a >= 0) ^ (b < 0);
        }

        public enum ChangeType {
            ADD, REMOVE
        }
    }
}

LeftRightLockTest.java


import org.junit.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class LeftRightLockTest {


    int logLineSequenceNumber = 0;
    private LeftRightLock sut = new LeftRightLock();

    @Test(timeout = 2000)
    public void acquiringLeftLockExcludeAcquiringRightLock() throws Exception {
        sut.lockLeft();


        Future<Boolean> task = Executors.newSingleThreadExecutor().submit(() -> sut.tryLockRight());
        assertFalse("I shouldn't be able to acquire the RIGHT lock!", task.get());
    }

    @Test(timeout = 2000)
    public void acquiringRightLockExcludeAcquiringLeftLock() throws Exception {
        sut.lockRight();
        Future<Boolean> task = Executors.newSingleThreadExecutor().submit(() -> sut.tryLockLeft());
        assertFalse("I shouldn't be able to acquire the LEFT lock!", task.get());
    }

    @Test(timeout = 2000)
    public void theLockShouldBeReentrant() throws Exception {
        sut.lockLeft();
        assertTrue(sut.tryLockLeft());
    }

    @Test(timeout = 2000)
    public void multipleThreadShouldBeAbleToAcquireTheSameLock_Right() throws Exception {
        sut.lockRight();
        Future<Boolean> task = Executors.newSingleThreadExecutor().submit(() -> sut.tryLockRight());
        assertTrue(task.get());
    }

    @Test(timeout = 2000)
    public void multipleThreadShouldBeAbleToAcquireTheSameLock_left() throws Exception {
        sut.lockLeft();
        Future<Boolean> task = Executors.newSingleThreadExecutor().submit(() -> sut.tryLockLeft());
        assertTrue(task.get());
    }

    @Test(timeout = 2000)
    public void shouldKeepCountOfAllTheThreadsHoldingTheSide() throws Exception {

        CountDownLatch latchA = new CountDownLatch(1);
        CountDownLatch latchB = new CountDownLatch(1);


        Thread threadA = spawnThreadToAcquireLeftLock(latchA, sut);
        Thread threadB = spawnThreadToAcquireLeftLock(latchB, sut);

        System.out.println("Both threads have acquired the left lock.");

        try {
            latchA.countDown();
            threadA.join();
            boolean acqStatus = sut.tryLockRight();
            System.out.println("The right lock was " + (acqStatus ? "" : "not") + " acquired");
            assertFalse("There is still a thread holding the left lock. This shouldn't succeed.", acqStatus);
        } finally {
            latchB.countDown();
            threadB.join();
        }

    }

    @Test(timeout = 5000)
    public void shouldBlockThreadsTryingToAcquireLeftIfRightIsHeld() throws Exception {
        sut.lockLeft();

        CountDownLatch taskStartedLatch = new CountDownLatch(1);

        final Future<Boolean> task = Executors.newSingleThreadExecutor().submit(() -> {
            taskStartedLatch.countDown();
            sut.lockRight();
            return false;
        });

        taskStartedLatch.await();
        Thread.sleep(100);

        assertFalse(task.isDone());
    }

    @Test
    public void shouldBeFreeAfterRelease() throws Exception {
        sut.lockLeft();
        sut.releaseLeft();
        assertTrue(sut.tryLockRight());
    }

    @Test
    public void shouldBeFreeAfterMultipleThreadsReleaseIt() throws Exception {
        CountDownLatch latch = new CountDownLatch(1);

        final Thread thread1 = spawnThreadToAcquireLeftLock(latch, sut);
        final Thread thread2 = spawnThreadToAcquireLeftLock(latch, sut);

        latch.countDown();

        thread1.join();
        thread2.join();

        assertTrue(sut.tryLockRight());

    }

    @Test(timeout = 2000)
    public void lockShouldBeReleasedIfNoThreadIsHoldingIt() throws Exception {
        CountDownLatch releaseLeftLatch = new CountDownLatch(1);
        CountDownLatch rightLockTaskIsRunning = new CountDownLatch(1);

        Thread leftLockThread1 = spawnThreadToAcquireLeftLock(releaseLeftLatch, sut);
        Thread leftLockThread2 = spawnThreadToAcquireLeftLock(releaseLeftLatch, sut);

        Future<Boolean> acquireRightLockTask = Executors.newSingleThreadExecutor().submit(() -> {
            if (sut.tryLockRight())
                throw new AssertionError("The left lock should be still held, I shouldn't be able to acquire right a this point.");
            printSynchronously("Going to be blocked on right lock");
            rightLockTaskIsRunning.countDown();
            sut.lockRight();
            printSynchronously("Lock acquired!");
            return true;
        });

        rightLockTaskIsRunning.await();

        releaseLeftLatch.countDown();
        leftLockThread1.join();
        leftLockThread2.join();

        assertTrue(acquireRightLockTask.get());
    }

    private synchronized void printSynchronously(String str) {

        System.out.println(logLineSequenceNumber++ + ")" + str);
        System.out.flush();
    }

    private Thread spawnThreadToAcquireLeftLock(CountDownLatch releaseLockLatch, LeftRightLock lock) throws InterruptedException {
        CountDownLatch lockAcquiredLatch = new CountDownLatch(1);
        final Thread thread = spawnThreadToAcquireLeftLock(releaseLockLatch, lockAcquiredLatch, lock);
        lockAcquiredLatch.await();
        return thread;
    }

    private Thread spawnThreadToAcquireLeftLock(CountDownLatch releaseLockLatch, CountDownLatch lockAcquiredLatch, LeftRightLock lock) {
        final Thread thread = new Thread(() -> {
            lock.lockLeft();
            printSynchronously("Thread " + Thread.currentThread() + " Acquired left lock");
            try {
                lockAcquiredLatch.countDown();
                releaseLockLatch.await();
            } catch (InterruptedException ignore) {
            } finally {
                lock.releaseLeft();
            }

            printSynchronously("Thread " + Thread.currentThread() + " RELEASED left lock");
        });
        thread.start();
        return thread;
    }
}

1
我不知道有任何库可以满足你的要求。即使有这样的库,它的价值也很小,因为每次你的请求更改时,该库都会停止执行其功能。
实际问题是“如何使用自定义规范实现自己的锁?”
Java 提供了一个名为 AbstractQueuedSynchronizer 的工具。它有广泛的文档。除了文档外,可能需要查看 CountDownLatchReentrantLock 的源代码,并将它们用作示例。
针对您的特定请求,请参见下面的代码,但请注意,它不公平且未经测试。
public class MultiReadWriteLock implements ReadWriteLock {

    private final Sync sync;
    private final Lock readLock;
    private final Lock writeLock;

    public MultiReadWriteLock() {
        this.sync = new Sync();
        this.readLock = new MultiLock(Sync.READ, sync);
        this.writeLock = new MultiLock(Sync.WRITE, sync);
    }

    @Override
    public Lock readLock() {
        return readLock;
    }

    @Override
    public Lock writeLock() {
        return writeLock;
    }

    private static final class Sync extends AbstractQueuedSynchronizer {

        private static final int READ = 1;
        private static final int WRITE = -1;

        @Override
        public int tryAcquireShared(int arg) {
            int state, result;
            do {
                state = getState();
                if (state >= 0 && arg == READ) {
                    // new read
                    result = state + 1;
                } else if (state <= 0 && arg == WRITE) {
                    // new write
                    result = state - 1;
                } else {
                    // blocked
                    return -1;
                }
            } while (!compareAndSetState(state, result));
            return 1;
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            int state, result;
            do {
                state = getState();
                if (state == 0) {
                    return false;
                }
                if (state > 0 && arg == READ) {
                    result = state - 1;
                } else if (state < 0 && arg == WRITE) {
                    result = state + 1;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } while (!compareAndSetState(state, result));
            return result == 0;
        }
    }

    private static class MultiLock implements Lock {

        private final int parameter;
        private final Sync sync;

        public MultiLock(int parameter, Sync sync) {
            this.parameter = parameter;
            this.sync = sync;
        }

        @Override
        public void lock() {
            sync.acquireShared(parameter);
        }

        @Override
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireSharedInterruptibly(parameter);
        }

        @Override
        public boolean tryLock() {
            return sync.tryAcquireShared(parameter) > 0;
        }

        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireSharedNanos(parameter, unit.toNanos(time));
        }

        @Override
        public void unlock() {
            sync.releaseShared(parameter);
        }

        @Override
        public Condition newCondition() {
            throw new UnsupportedOperationException(
                "Conditions are unsupported as there are no exclusive access"
            );
        }
    }
}

0

怎么样?

class ABSync {

    private int aHolders;
    private int bHolders;

    public synchronized void lockA() throws InterruptedException {
        while (bHolders > 0) {
            wait();
        }
        aHolders++;
    }

    public synchronized void lockB() throws InterruptedException {
        while (aHolders > 0) {
            wait();
        }
        bHolders++;
    }

    public synchronized void unlockA() {
        aHolders = Math.max(0, aHolders - 1);
        if (aHolders == 0) {
            notifyAll();
        }
    }

    public synchronized void unlockB() {
        bHolders = Math.max(0, bHolders - 1);
        if (bHolders == 0) {
            notifyAll();
        }
    }
}

更新:关于“公平性”(或者说,非饥饿性),OP的要求并没有提到。为了实现OP的要求+某种形式的公平性/非饥饿性,应该明确指定(您认为什么是公平的,当当前占主导地位和非主导地位的锁的请求流进来时应该如何行为等)。其中一种实现方式是:

class ABMoreFairSync {

    private Lock lock = new ReentrantLock(true);
    public final Part A, B;

    public ABMoreFairSync() {
        A = new Part();
        B = new Part();
        A.other = B;
        B.other = A;
    }

    private class Part {
        private Condition canGo = lock.newCondition();
        private int currentGeneration, lastGeneration;
        private int holders;
        private Part other;

        public void lock() throws InterruptedException {
            lock.lockInterruptibly();
            try {
                int myGeneration = lastGeneration;
                if (other.holders > 0 || currentGeneration < myGeneration) {
                    if (other.currentGeneration == other.lastGeneration) {
                        other.lastGeneration++;
                    }
                    while (other.holders > 0 || currentGeneration < myGeneration) {
                        canGo.await();
                    }
                }
                holders++;
            } finally {
                lock.unlock();
            }
        }

        public void unlock() throws InterruptedException {
            lock.lockInterruptibly();
            try {
                holders = Math.max(0, holders - 1);
                if (holders == 0) {
                    currentGeneration++;
                    other.canGo.signalAll();
                }
            } finally {
                lock.unlock();
            }
        }
    }
}

用于:

 sync.A.lock();
 try {
     ...
 } finally {
     sync.A.unlock();
 }

这里的“代”概念来自于《Java并发编程实战》第14.9节


1
@SergeyFedorov,您能详细解释一下吗?我真的认为我的synchronized用法足够正确了。 - starikoff
2
@SergeyFedorov 同步块将在测试的两侧强制产生内存障碍。while循环将起作用,并且实际上是一种良好的做法(参见https://dev59.com/D3NA5IYBdhLWcg3wPbWe)。 - teppic
@teppic 那是真的,但缓存问题仍然存在。 - Sergey Fedorov
1
@SergeyFedorov 任何可以优化掉监视器锁的编译器都是基本上有问题的。 - teppic
如果始终有一个线程保持锁A(这种情况下aHolders永远不会达到0,bHolders++永远不会被执行),那么lockB将永远不会返回。我喜欢代码的简单性,但它需要一部分来决定何时优先处理对锁B的锁请求而不是对锁A的锁请求,反之亦然。 - vanOekel
显示剩余2条评论

0
在我第n次尝试制作一个简单的公平实现后,我认为我明白了为什么我找不到另一个“互斥锁对”的库/示例:它需要一个非常特定的用户用例。正如OP所提到的,使用ReadWriteLock可以走得更远,而公平锁对仅在快速连续多个锁请求时有用(否则您可能会使用一个普通锁)。
下面的实现更像是一个“许可证分发器”:它不是可重入的。但是它可以被做成可重入的(如果不能,我担心我没有使代码简单易读),但是它需要一些额外的管理来处理各种情况(例如,一个线程两次锁定A,仍然需要两次解锁A,并且解锁方法需要知道何时没有更多的未解锁的锁)。当一个线程锁定A并想要锁定B时抛出死锁错误的选项可能是一个好主意。
主要思路是有一个“活动锁”,只有在没有(请求)锁时才能通过lock方法更改,并且当活动锁的未完成数量达到零时,可以通过unlock方法更改。其余部分基本上是计算锁请求并使线程等待,直到可以更改活动锁。让线程等待涉及使用InterruptedExceptions,我做出了妥协:我找不到适用于所有情况的好解决方案(例如应用程序关闭,一个被中断的线程等)。
我只进行了一些基本测试(测试类在结尾处),需要进行更多验证。
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.ReentrantLock;

/**
 * A pair of mutual exclusive read-locks: many threads can hold a lock for A or B, but never A and B.
 * <br>Usage:<pre>
 * PairedLock plock = new PairedLock();
 * plock.lockA();
 * try {
 *     // do stuff
 * } finally {
 *     plock.unlockA();
 * }</pre>
 * This lock is not reentrant: a lock is not associated with a thread and a thread asking for the same lock 
 * might be blocked the second time (potentially causing a deadlock).
 * <p> 
 * When a lock for A is active, a lock for B will wait for all locks on A to be unlocked and vice versa.
 * <br>When a lock for A is active, and a lock for B is waiting, subsequent locks for A will wait 
 * until all (waiting) locks for B are unlocked.
 * I.e. locking is fair (in FIFO order).
 * <p>
 * See also
 * <a href="https://dev59.com/RJ3ha4cB1Zd3GeqPYbWp">stackoverflow-java-concurrency-paired-locks-with-shared-access</a>
 * 
 * @author vanOekel
 *
 */
public class PairedLock {

    static final int MAX_LOCKS = 2;
    static final int CLOSE_PERMITS = 10_000;

    /** Use a fair lock to keep internal state instead of the {@code synchronized} keyword. */
    final ReentrantLock state = new ReentrantLock(true);

    /** Amount of threads that have locks. */
    final int[] activeLocks = new int[MAX_LOCKS];

    /** Amount of threads waiting to receive a lock. */
    final int[] waitingLocks = new int[MAX_LOCKS];

    /** Threads block on a semaphore until locks are available. */
    final Semaphore[] waiters = new Semaphore[MAX_LOCKS];

    int activeLock;
    volatile boolean closed;

    public PairedLock() {
        super();
        for (int i = 0; i < MAX_LOCKS; i++) {
            // no need for fair semaphore: unlocks are done for all in one go.
            waiters[i] = new Semaphore(0);
        }
    }

    public void lockA() throws InterruptedException { lock(0); }
    public void lockB() throws InterruptedException { lock(1); }

    public void lock(int lockNumber) throws InterruptedException {

        if (lockNumber < 0 || lockNumber >= MAX_LOCKS) {
            throw new IllegalArgumentException("Lock number must be 0 or less than " + MAX_LOCKS);
        } else if (isClosed()) {
            throw new IllegalStateException("Lock closed.");
        }
        boolean wait = false;
        state.lock();
        try {
            if (nextLockIsWaiting()) {
                wait = true;
            } else if (activeLock == lockNumber) {
                activeLocks[activeLock]++;
            } else if (activeLock != lockNumber && activeLocks[activeLock] == 0) {
                // nothing active and nobody waiting - safe to switch to another active lock
                activeLock = lockNumber;
                activeLocks[activeLock]++;
            } else {
                // with only two locks this means this is the first lock that needs an active-lock switch.
                // in other words:
                // activeLock != lockNumber && activeLocks[activeLock] > 0 && waitingLocks[lockNumber] == 0
                wait = true;
            }
            if (wait) {
                waitingLocks[lockNumber]++;
            }
        } finally {
            state.unlock();
        }
        if (wait) {
            waiters[lockNumber].acquireUninterruptibly();
            // there is no easy way to bring this lock back into a valid state when waiters do no get a lock.
            // so for now, use the closed state to make this lock unusable any further.
            if (closed) {
                throw new InterruptedException("Lock closed.");
            }
        }
    }

    protected boolean nextLockIsWaiting() {
        return (waitingLocks[nextLock(activeLock)] > 0);
    }

    protected int nextLock(int lockNumber) {
        return (lockNumber == 0 ? 1 : 0);
    }

    public void unlockA() { unlock(0); }
    public void unlockB() { unlock(1); }

    public void unlock(int lockNumber) {

        // unlock is called in a finally-block and should never throw an exception.
        if (lockNumber < 0 || lockNumber >= MAX_LOCKS) {
            System.out.println("Cannot unlock lock number " + lockNumber);
            return;
        }
        state.lock();
        try {
            if (activeLock != lockNumber) {
                System.out.println("ERROR: invalid lock state: no unlocks for inactive lock expected (active: " + activeLock + ", unlock: " + lockNumber + ").");
                return;
            }
            activeLocks[lockNumber]--;
            if (activeLocks[activeLock] == 0 && nextLockIsWaiting()) {
                activeLock = nextLock(lockNumber);
                waiters[activeLock].release(waitingLocks[activeLock]);
                activeLocks[activeLock] += waitingLocks[activeLock];
                waitingLocks[activeLock] = 0;
            } else if (activeLocks[lockNumber] < 0) {
                System.out.println("ERROR: to many unlocks for lock number " + lockNumber);
                activeLocks[lockNumber] = 0;
            }
        } finally {
            state.unlock();
        }
    }

    public boolean isClosed() { return closed; }

    /**
     * All threads waiting for a lock will be unblocked and an {@link InterruptedException} will be thrown.
     * Subsequent calls to the lock-method will throw an {@link IllegalStateException}.
     */
    public synchronized void close() {

        if (!closed) {
            closed = true;
            for (int i = 0; i < MAX_LOCKS; i++) {
                waiters[i].release(CLOSE_PERMITS);
            }
        }
    }

    @Override
    public String toString() {

        StringBuilder sb = new StringBuilder(this.getClass().getSimpleName());
        sb.append("=").append(this.hashCode());
        state.lock();
        try {
            sb.append(", active=").append(activeLock).append(", switching=").append(nextLockIsWaiting());
            sb.append(", lockA=").append(activeLocks[0]).append("/").append(waitingLocks[0]);
            sb.append(", lockB=").append(activeLocks[1]).append("/").append(waitingLocks[1]);
        } finally {
            state.unlock();
        }
        return sb.toString();
    }

}

测试类(YMMV - 在我的系统上运行良好,但由于线程启动和运行速度的快慢,可能会在您的系统上死锁):

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PairedLockTest {

    private static final Logger log = LoggerFactory.getLogger(PairedLockTest.class);

    public static final ThreadPoolExecutor tp = (ThreadPoolExecutor) Executors.newCachedThreadPool();

    public static void main(String[] args) {

        try {
            new PairedLockTest().test();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            tp.shutdownNow();
        }
    }

    PairedLock mlock = new PairedLock();

    public void test() throws InterruptedException {

        CountDownLatch start = new CountDownLatch(1);
        CountDownLatch done = new CountDownLatch(2);
        mlock.lockA();
        try {
            logLock("la1 ");
            mlock.lockA();
            try {
                lockAsync(start, null, done, 1);
                await(start);
                logLock("la2 ");
            } finally {
                mlock.unlockA();
            }
            lockAsync(null, null, done, 0);
        } finally {
            mlock.unlockA();
        }
        await(done);
        logLock();
    }

    void lockAsync(CountDownLatch start, CountDownLatch locked, CountDownLatch unlocked, int lockNumber) {

        tp.execute(() -> {
            countDown(start);
            await(start);
            //log.info("Locking async " + lockNumber);
            try {
                mlock.lock(lockNumber);
                try {
                    countDown(locked);
                    logLock("async " + lockNumber + " ");
                } finally {
                    mlock.unlock(lockNumber);
                    //log.info("Unlocked async " + lockNumber);
                    //logLock("async " + lockNumber + " ");
                }
                countDown(unlocked);
            } catch (InterruptedException ie) {
                log.warn(ie.toString());
            }
        });
    }

    void logLock() {
        logLock("");
    }

    void logLock(String msg) {
        log.info(msg + mlock.toString());
    }

    static void countDown(CountDownLatch l) {
        if (l != null) {
            l.countDown();
        }
    }

    static void await(CountDownLatch l) {

        if (l == null) {
            return;
        }
        try {
            l.await();
        } catch (InterruptedException e) {
            log.error(e.toString(), e.getCause());
        }
    }

}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接