在Java中如何暂停/恢复ExecutorService中的所有线程?

28

我在Java中提交了许多作业到执行器服务(ExecutorService),我希望能够临时暂停这些作业。最好的方法是什么?我该如何恢复?或者我完全错了吗?对于我想要实现的暂停/恢复执行服务的能力,应该遵循其他模式吗?


1
你的意思是阻止新的工作运行,还是暂停已经在运行的工作? - Jon Skeet
暂停正在运行的作业。暂停/恢复可能会在“关闭”之后被调用。 - pathikrit
在这种情况下,你启动任务的方式几乎无关紧要。你需要编写暂停代码 - 例如,每个任务可能会定期检查一个“是否应该暂停”的标志。当然,这仍然不会是即时的。 - Jon Skeet
嗯,我想我可以创建自己特殊类型的“Runnables”,它们可以理解全局暂停/恢复标志。我希望能够使用我拥有的“Futures”列表或通过“ExecutorService”本身以更清晰的方式来完成它。 - pathikrit
5个回答

19
为了回答自己的问题,我在ThreadPoolExecutor的javadocs中找到了一个PausableThreadPoolExecutor的示例。下面是我使用Guava的Monitors改写后的版本:

import com.google.common.util.concurrent.Monitor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;

public class PausableExecutor extends ScheduledThreadPoolExecutor {

    private boolean isPaused;

    private final Monitor monitor = new Monitor();
    private final Monitor.Guard paused = new Monitor.Guard(monitor) {
        @Override
        public boolean isSatisfied() {
            return isPaused;
        }
    };

    private final Monitor.Guard notPaused = new Monitor.Guard(monitor) {
        @Override
        public boolean isSatisfied() {
            return !isPaused;
        }
    };

    public PausableExecutor(int corePoolSize, ThreadFactory threadFactory) {
        super(corePoolSize, threadFactory);
    }

    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        monitor.enterWhenUninterruptibly(notPaused);
        try {
            monitor.waitForUninterruptibly(notPaused);
        } finally {
            monitor.leave();
        }
    }

    public void pause() {
        monitor.enterIf(notPaused);
        try {
            isPaused = true;
        } finally {
            monitor.leave();
        }
    }

    public void resume() {
        monitor.enterIf(paused);
        try {
            isPaused = false;
        } finally {
            monitor.leave();
        }
    }
}

3
你的解决方案与javadocs中的示例存在一些重大差异... (1) 你使用了两个 Guard,而不是javadocs中的一个 Condition; (2) 你在if外使用了 enterIf(这是完全错误的); (3) Monitorleave使用的是 signal 而不是 signalAll(这里实际上需要 signalAll); 最后 (4) 如果你已经基于 notPaused 进入了 Monitor,为什么要等待 notPaused 呢?(直接离开即可)。总的来说,我认为在这里使用 Monitor 不是一个好选择... - Corin
  1. 我认为Guava的Monitor/Guard比Condition更清晰。这只是个人偏好。
  2. 你是指在外部使用try而不是if吗?我使用了Guava文档中记录的惯用法Guard。
  3. 为什么要使用signalAll?这个Executor只涉及到它包含的线程,如果我们使用signal或signalAll对它们来说都没有影响。
  4. 如果您查看Monitor文档 - http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/util/concurrent/Monitor.html - Google自己建议使用单独的监视器,即使其中一个是另一个的布尔相反。
- pathikrit
你好,我使用了同一个ThreadPoolExecutor并向其中添加了一个Runnable线程。但是我无法暂停和恢复Runnable线程。请问您能指导我如何实现吗? - sssvrock
@sssvrock,该实现暂停/恢复执行器。当暂停时,它会在beforeExecute处等待。该实现不会在线程启动后暂停线程。Runnable线程不能(也不应该)在运行时暂停。请参见https://docs.oracle.com/javase/7/docs/technotes/guides/concurrency/threadPrimitiveDeprecation.html 要对运行中的线程进行更复杂的控制,可以查看https://dev59.com/umQn5IYBdhLWcg3wiHjl - Kartal Tabak

11

我对你的答案提出了一些批评,但它们并不是很有建设性...因此,这是我的解决方案。我会使用像这样的类,然后在需要暂停功能时随时调用checkIn。在GitHub上找到它!

import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Provides a mechanism to pause multiple threads.
 * If wish your thread to participate, then it must regularly check in with an instance of this object.
 * 
 * @author Corin Lawson <corin@phiware.com.au>
 */
public class Continue {
    private boolean isPaused;
    private ReentrantLock pauseLock = new ReentrantLock();
    private Condition unpaused = pauseLock.newCondition();

    public void checkIn() throws InterruptedException {
        if (isPaused) {
            pauseLock.lock();
            try {
                while (isPaused)
                    unpaused.await();
            } finally {
                pauseLock.unlock();
            }
        }
    }

    public void checkInUntil(Date deadline) throws InterruptedException {
        if (isPaused) {
            pauseLock.lock();
            try {
                while (isPaused)
                    unpaused.awaitUntil(deadline);
            } finally {
                pauseLock.unlock();
            }
        }
    }

    public void checkIn(long nanosTimeout) throws InterruptedException {
        if (isPaused) {
            pauseLock.lock();
            try {
                while (isPaused)
                    unpaused.awaitNanos(nanosTimeout);
            } finally {
                pauseLock.unlock();
            }
        }
    }

    public void checkIn(long time, TimeUnit unit) throws InterruptedException {
        if (isPaused) {
            pauseLock.lock();
            try {
                while (isPaused)
                    unpaused.await(time, unit);
            } finally {
                pauseLock.unlock();
            }
        }
    }

    public void checkInUninterruptibly() {
        if (isPaused) {
            pauseLock.lock();
            try {
                while (isPaused)
                    unpaused.awaitUninterruptibly();
            } finally {
                pauseLock.unlock();
            }
        }
    }

    public boolean isPaused() {
        return isPaused;
    }

    public void pause() {
        pauseLock.lock();
        try {
            isPaused = true;
        } finally {
            pauseLock.unlock();
        }
    }

    public void resume() {
        pauseLock.lock();
        try {
            if (isPaused) {
                isPaused = false;
                unpaused.signalAll();
            }
        } finally {
            pauseLock.unlock();
        }
    }
}

例如:

import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;

public class PausableExecutor extends ScheduledThreadPoolExecutor {
    private Continue cont;

    public PausableExecutor(int corePoolSize, ThreadFactory threadFactory, Continue c) {
        super(corePoolSize, threadFactory);
        cont = c;
    }

    protected void beforeExecute(Thread t, Runnable r) {
        cont.checkIn();
        super.beforeExecute(t, r);
    }
}

这样做的额外好处是,您可以通过一次调用 Continuepause 方法暂停多个线程。

1
谢谢,我刚刚使用了你的示例来实现这个功能,但是我有几个评论。beforeExecute必须捕获InterruptedException才能编译。不清楚你不需要子类化ScheduledThreadPoolExecutor,你可以直接使用ThreadPoolExecutor,这就是我正在使用的。PausableExcecutor只会暂停已提交但未启动的任务的执行,要暂停已经启动的任务,您需要在任务代码本身中调用checkIn,我使用checkInInterruptably()来实现这一点,但不确定是否是一个好主意。 - Paul Taylor
感谢分享 - 这是我尝试过的许多方法中的第一个有效的方法。 - Adam Rabung
boolean isPaused 应该是 volatile 的吗?还是 ReentrantLock 充当了内存屏障?例如,线程 A 调用 pause()resume(),线程 B 调用 checkIn(),线程 C 调用 isPaused() - Barn
嗨,我已经尝试过使用@pathiikrit和Corin的解决方案来实现使用线程池管理器暂停和恢复可运行线程,但在我的情况下根本不起作用。 - sssvrock
这可以防止计划任务执行,但不能防止暂停的任务在队列中积累。例如,如果您按固定速率每秒安排一次任务,然后暂停五秒钟,当您取消暂停时,您的可运行程序将触发五次。 - robross0606

5

我在寻找executor中的暂停/恢复功能,但需要额外的能力来等待当前正在处理的任何任务。以下是其他优秀实现方案的变体(来自SO),并添加了等待函数。我已经在只有单个线程的executor上测试了它。因此,基本用法是:

executor.pause();
executor.await(10000); // blocks till current tasks processing ends

类代码:

import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class PausableScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {      
  public boolean isPaused;
  private ReentrantLock pauseLock = new ReentrantLock();
  private Condition unpaused = pauseLock.newCondition();
  private Latch activeTasksLatch = new Latch();

  private class Latch {
    private final Object synchObj = new Object();
    private int count;

    public boolean awaitZero(long waitMS) throws InterruptedException {
      long startTime = System.currentTimeMillis();
      synchronized (synchObj) {
        while (count > 0) {
          if ( waitMS != 0) {
            synchObj.wait(waitMS);
            long curTime = System.currentTimeMillis();
            if ( (curTime - startTime) > waitMS ) {                
              return count <= 0;
            }
          }
          else
            synchObj.wait();
        }
        return count <= 0; 
      }
    }
    public void countDown() {
      synchronized (synchObj) {
        if (--count <= 0) {
          // assert count >= 0;              
          synchObj.notifyAll();
        }
      }
    }
    public void countUp() {
      synchronized (synchObj) {
        count++;
      }
    }    
  }

  /**
   * Default constructor for a simple fixed threadpool
   */
  public PausableScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize);
  }

  /**
   * Executed before a task is assigned to a thread.
   */
  @Override
  protected void beforeExecute(Thread t, Runnable r) {
    pauseLock.lock();
    try {
      while (isPaused)
        unpaused.await();
    } catch (InterruptedException ie) {
      t.interrupt();
    } finally {
      pauseLock.unlock();
    }

    activeTasksLatch.countUp();
    super.beforeExecute(t, r);
  }

  @Override
  protected void afterExecute(Runnable r, Throwable t) {
    try {
      super.afterExecute(r, t);
    }
    finally {
      activeTasksLatch.countDown();
    }
  }

  /**
   * Pause the threadpool. Running tasks will continue running, but new tasks
   * will not start untill the threadpool is resumed.
   */
  public void pause() {
    pauseLock.lock();
    try {
      isPaused = true;
    } finally {
      pauseLock.unlock();
    }
  }

  /**
   * Wait for all active tasks to end.
   */ 
  public boolean await(long timeoutMS) {
    // assert isPaused;
    try {
      return activeTasksLatch.awaitZero(timeoutMS);
    } catch (InterruptedException e) {
      // log e, or rethrow maybe
    }
    return false;
  }

  /**
   * Resume the threadpool.
   */
  public void resume() {
    pauseLock.lock();
    try {
      isPaused = false;
      unpaused.signalAll();
    } finally {
      pauseLock.unlock();
    }
  }

}

这看起来不错。您或其他人测试过这个更彻底吗?有没有修订或修复?因为它不会引入另一个库,所以我现在要使用它。 - Mike
它在相当大的应用程序中使用,目前没有任何问题。如果这段代码有任何错误,我也愿意听取意见。 - marcinj
@marcinj 我正在尝试您的执行器代码。暂停和恢复都很好用。但是我注意到,当我在暂停时调用shutDownNow()时,它会在实际关闭之前恢复并运行一些任务。有什么办法可以防止这种情况发生吗? - ProgrAmmar
@ProgrAmmar 我尝试使用这段代码进行复现:http://melpon.org/wandbox/permlink/XHa9NwmI7n1WAr3F,但是我失败了 - 你能看看那是否是问题的原因吗?据我理解,“test 4”“test 5”“test 6”应该被写入控制台。这些是不应该被执行的任务的输出,但它们现在被写入了。 - marcinj
@marcinj,我无法打开您的链接。所以我在这里创建了自己的示例:http://pastebin.com/AY6r1zGD。我已经从您的代码中创建了一个FixedThreadPoolExecutor。您可以看到,在运行它时,一些任务会在shutDownNow()之后被调用。 - ProgrAmmar

4

问题在于Runnable/Callable本身需要检查何时暂停/恢复。话虽如此,有许多方法可以做到这一点,具体取决于您的要求。无论您选择哪种解决方案,都需要使等待可中断,以便线程可以被干净地关闭。


0

我知道这个问题早已过时,但我尝试了所有的答案,都无法满足我的需求,我需要一个可暂停计时器来执行任务,而它们在恢复后会一次性抛弃掉之前应该按计划执行的所有数据。

相反,我在GitHub上找到了这个Timer这里。对我来说它非常有效。

*我并没有编写这段代码,只是找到了它。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接