如何“取消”CountDownLatch?

18

我有多个消费者线程在等待大小为1的CountDownLatch,使用await()方法。同时,我有一个单独的生产者线程,当成功完成时调用countDown()方法。

当没有错误时,这样做非常好。

但是,如果生产者检测到错误,我希望它能够向消费者线程发出错误信号。理想情况下,我可以让生产者调用类似于abortCountDown()的方法,并使所有消费者接收InterruptedException或其他异常。我不想调用countDown()方法,因为这需要所有消费者线程在调用await()方法后再进行额外的手动检查以确保成功。我更希望他们只接收异常,因为他们已经知道如何处理异常。

我知道CountDownLatch没有提供中止功能。是否有其他同步原语可以轻松适应,以有效创建支持中止倒计时的CountDownLatch

6个回答

17

JB Nizet给出了一份很好的答案。 我对他的答案进行了改进。结果是一个CountDownLatch的子类,名为AbortableCountDownLatch,它在类中添加了一个"abort()"方法,该方法将导致所有等待latch的线程收到AbortException(InterruptedException的子类)。

另外,与JB的类不同,AbortableCountDownLatch会立即中止所有阻塞的线程,而不是等待倒计时达到零(适用于使用count>1的情况)。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class AbortableCountDownLatch extends CountDownLatch {
    protected boolean aborted = false;

    public AbortableCountDownLatch(int count) {
        super(count);
    }


   /**
     * Unblocks all threads waiting on this latch and cause them to receive an
     * AbortedException.  If the latch has already counted all the way down,
     * this method does nothing.
     */
    public void abort() {
        if( getCount()==0 )
            return;

        this.aborted = true;
        while(getCount()>0)
            countDown();
    }


    @Override
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        final boolean rtrn = super.await(timeout,unit);
        if (aborted)
            throw new AbortedException();
        return rtrn;
    }

    @Override
    public void await() throws InterruptedException {
        super.await();
        if (aborted)
            throw new AbortedException();
    }


    public static class AbortedException extends InterruptedException {
        public AbortedException() {
        }

        public AbortedException(String detailMessage) {
            super(detailMessage);
        }
    }
}

如何使用这个类,我的情况是我有一个列表,而且该列表在实时动态更新。当列表中出现自动事件时,我必须等待2分钟,但如果在等待时间内出现用户手动事件,则必须立即中断等待并继续进行。 - Swapnil Gangrade
1
我不相信这个类是线程安全的。例如,如果一个或多个线程同时调用countDown()并且在此同时调用了abort(),则abort()中的while循环存在潜在问题。 - DaedalusAlpha

13

使用 CountDownLatch 内部将此行为封装在一个特定的高级类中:

public class MyLatch {
    private CountDownLatch latch;
    private boolean aborted;
    ...

    // called by consumers
    public void await() throws AbortedException {
        latch.await();
        if (aborted) {
            throw new AbortedException();
        }
    }

    // called by producer
    public void abort() {
        this.aborted = true;
        latch.countDown();
    }

    // called by producer
    public void succeed() {
        latch.countDown();
    }
}

5
不行,因为线程安全由CountDownLatch确保:countDown方法确保在线程之间存在内存障碍。Javadoc说:“在调用countDown()之前的线程操作发生在相应await()成功返回后的操作之前。” - JB Nizet
您IP地址为143.198.54.68,由于运营成本限制,当前对于免费用户的使用频率限制为每个IP每72小时10次对话,如需解除限制,请点击左下角设置图标按钮(手机用户先点击左上角菜单按钮)。 - Timmos
@Timmos 是的,它是安全的:CountDownLatch 保证在 countDown() 之前所做的所有更改都可以被等待它的线程看到。 - JB Nizet
在我的情况下,我想它仍然不安全:工作线程并行写入列表,并且工作线程之间没有 happens-before 关系,只有与主线程的关系。但是,您对 happens-before 的评论是一个很好的补充。到目前为止,我已经完成了《Java并发编程实战》以及涵盖此主题的部分,确实确认了您在此处所说的内容,即第16章或更具体地说是第16.1.4小节中的“同步猪跑”。在这个例子中,布尔值正在利用CountDownLatch的同步。 - Timmos
我错过了工作线程并行写入列表的部分。那么,确实需要同步。但这与CountDownLatch没有太大关系。不安全的部分发生在await()调用返回之后。 - JB Nizet

4

您可以创建一个围绕CountDownLatch的包装器,提供取消等待者的能力。它需要跟踪等待线程并在超时时释放它们,同时记住该闩锁被取消,因此将来对await的调用将立即中断。

public class CancellableCountDownLatch
{
    final CountDownLatch latch;
    final List<Thread> waiters;
    boolean cancelled = false;

    public CancellableCountDownLatch(int count) {
        latch = new CountDownLatch(count);
        waiters = new ArrayList<Thread>();
    }

    public void await() throws InterruptedException {
        try {
            addWaiter();
            latch.await();
        }
        finally {
            removeWaiter();
        }
    }

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        try {
            addWaiter();
            return latch.await(timeout, unit);
        }
        finally {
            removeWaiter();
        }
    }

    private synchronized void addWaiter() throws InterruptedException {
        if (cancelled) {
            Thread.currentThread().interrupt();
            throw new InterruptedException("Latch has already been cancelled");
        }
        waiters.add(Thread.currentThread());
    }

    private synchronized void removeWaiter() {
        waiters.remove(Thread.currentThread());
    }

    public void countDown() {
        latch.countDown();
    }

    public synchronized void cancel() {
        if (!cancelled) {
            cancelled = true;
            for (Thread waiter : waiters) {
                waiter.interrupt();
            }
            waiters.clear();
        }
    }

    public long getCount() {
        return latch.getCount();
    }

    @Override
    public String toString() {
        return latch.toString();
    }
}

请用示例解释如何使用它。我的情况是我有一个列表,列表在实时动态更新。列表中会出现自动事件,我必须等待2分钟,但如果在等待时间内出现用户手动事件,则必须立即中断等待并立即进行。 - Swapnil Gangrade

0
你可以使用一个允许访问其受保护的getWaitingThreads方法的ReentrantLock来自己实现一个CountDownLatch
例如:
public class FailableCountDownLatch {
    private static class ConditionReentrantLock extends ReentrantLock {
        private static final long serialVersionUID = 2974195457854549498L;

        @Override
        public Collection<Thread> getWaitingThreads(Condition c) {
            return super.getWaitingThreads(c);
        }
    }

    private final ConditionReentrantLock lock = new ConditionReentrantLock();
    private final Condition countIsZero = lock.newCondition();
    private long count;

    public FailableCountDownLatch(long count) {
        this.count = count;
    }

    public void await() throws InterruptedException {
        lock.lock();
        try {
            if (getCount() > 0) {
                countIsZero.await();
            }
        } finally {
            lock.unlock();
        }
    }

    public boolean await(long time, TimeUnit unit) throws InterruptedException {
        lock.lock();
        try {
            if (getCount() > 0) {
                return countIsZero.await(time, unit);
            }
        } finally {
            lock.unlock();
        }
        return true;
    }

    public long getCount() {
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }

    public void countDown() {
        lock.lock();
        try {
            if (count > 0) {
                count--;

                if (count == 0) {
                    countIsZero.signalAll();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    public void abortCountDown() {
        lock.lock();
        try {
            for (Thread t : lock.getWaitingThreads(countIsZero)) {
                t.interrupt();
            }
        } finally {
            lock.unlock();
        }
    }
}

你可能想要更改这个类,在它被取消后,对await的新调用抛出InterruptedException。如果需要该功能,甚至可以让这个类扩展CountDownLatch


0
这里有一个简单的选项,它包装了CountDownLatch。它类似于第二个答案,但不需要重复调用countdown,如果latch是一个很大的数字,这可能会非常昂贵。它使用AtomicInteger来进行实际计数,并配合一个初始值为1的CountDownLatch。

https://github.com/scottf/CancellableCountDownLatch/blob/main/CancellableCountDownLatch.java

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class CancellableCountDownLatch {
    private final AtomicInteger count;
    private final CountDownLatch cdl;

    public CancellableCountDownLatch(int count) {
        this.count = new AtomicInteger(count);
        cdl = new CountDownLatch(1);
    }

    public void cancel() {
        count.set(0);
        cdl.countDown();
    }

    public void await() throws InterruptedException {
        cdl.await();
    }

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return cdl.await(timeout, unit);
    }

    public void countDown() {
        if (count.decrementAndGet() <= 0) {
            cdl.countDown();
        }
    }

    public long getCount() {
        return Math.max(count.get(), 0);
    }

    @Override
    public String toString() {
        return super.toString() + "[Count = " + getCount() + "]";
    }
}

0

自从Java 8以来,您可以使用CompletableFuture来实现这一点。一个或多个线程可以调用阻塞的get()方法:

CompletableFuture<Void> cf = new CompletableFuture<>();
try {
  cf.get();
} catch (ExecutionException e) {
  //act on error
}

另一个线程可以通过 cf.complete(null) 成功完成它,也可以通过 cf.completeExceptionally(new MyException()) 异常完成它。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接