我在Java中提交了许多作业到执行器服务(ExecutorService),我希望能够临时暂停这些作业。最好的方法是什么?我该如何恢复?或者我完全错了吗?对于我想要实现的暂停/恢复执行服务的能力,应该遵循其他模式吗?
我在Java中提交了许多作业到执行器服务(ExecutorService),我希望能够临时暂停这些作业。最好的方法是什么?我该如何恢复?或者我完全错了吗?对于我想要实现的暂停/恢复执行服务的能力,应该遵循其他模式吗?
Guard
,而不是javadocs中的一个 Condition
;
(2) 你在if外使用了 enterIf
(这是完全错误的);
(3) Monitor
的 leave
使用的是 signal
而不是 signalAll
(这里实际上需要 signalAll
);
最后 (4) 如果你已经基于 notPaused
进入了 Monitor
,为什么要等待 notPaused
呢?(直接离开即可)。总的来说,我认为在这里使用 Monitor
不是一个好选择... - Corin我对你的答案提出了一些批评,但它们并不是很有建设性...因此,这是我的解决方案。我会使用像这样的类,然后在需要暂停功能时随时调用checkIn
。在GitHub上找到它!
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* Provides a mechanism to pause multiple threads.
* If wish your thread to participate, then it must regularly check in with an instance of this object.
*
* @author Corin Lawson <corin@phiware.com.au>
*/
public class Continue {
private boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
public void checkIn() throws InterruptedException {
if (isPaused) {
pauseLock.lock();
try {
while (isPaused)
unpaused.await();
} finally {
pauseLock.unlock();
}
}
}
public void checkInUntil(Date deadline) throws InterruptedException {
if (isPaused) {
pauseLock.lock();
try {
while (isPaused)
unpaused.awaitUntil(deadline);
} finally {
pauseLock.unlock();
}
}
}
public void checkIn(long nanosTimeout) throws InterruptedException {
if (isPaused) {
pauseLock.lock();
try {
while (isPaused)
unpaused.awaitNanos(nanosTimeout);
} finally {
pauseLock.unlock();
}
}
}
public void checkIn(long time, TimeUnit unit) throws InterruptedException {
if (isPaused) {
pauseLock.lock();
try {
while (isPaused)
unpaused.await(time, unit);
} finally {
pauseLock.unlock();
}
}
}
public void checkInUninterruptibly() {
if (isPaused) {
pauseLock.lock();
try {
while (isPaused)
unpaused.awaitUninterruptibly();
} finally {
pauseLock.unlock();
}
}
}
public boolean isPaused() {
return isPaused;
}
public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}
public void resume() {
pauseLock.lock();
try {
if (isPaused) {
isPaused = false;
unpaused.signalAll();
}
} finally {
pauseLock.unlock();
}
}
}
例如:
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
public class PausableExecutor extends ScheduledThreadPoolExecutor {
private Continue cont;
public PausableExecutor(int corePoolSize, ThreadFactory threadFactory, Continue c) {
super(corePoolSize, threadFactory);
cont = c;
}
protected void beforeExecute(Thread t, Runnable r) {
cont.checkIn();
super.beforeExecute(t, r);
}
}
Continue
的 pause
方法暂停多个线程。boolean isPaused
应该是 volatile 的吗?还是 ReentrantLock
充当了内存屏障?例如,线程 A 调用 pause()
或 resume()
,线程 B 调用 checkIn()
,线程 C 调用 isPaused()
。 - Barn我在寻找executor中的暂停/恢复功能,但需要额外的能力来等待当前正在处理的任何任务。以下是其他优秀实现方案的变体(来自SO),并添加了等待函数。我已经在只有单个线程的executor上测试了它。因此,基本用法是:
executor.pause();
executor.await(10000); // blocks till current tasks processing ends
类代码:
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class PausableScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
public boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
private Latch activeTasksLatch = new Latch();
private class Latch {
private final Object synchObj = new Object();
private int count;
public boolean awaitZero(long waitMS) throws InterruptedException {
long startTime = System.currentTimeMillis();
synchronized (synchObj) {
while (count > 0) {
if ( waitMS != 0) {
synchObj.wait(waitMS);
long curTime = System.currentTimeMillis();
if ( (curTime - startTime) > waitMS ) {
return count <= 0;
}
}
else
synchObj.wait();
}
return count <= 0;
}
}
public void countDown() {
synchronized (synchObj) {
if (--count <= 0) {
// assert count >= 0;
synchObj.notifyAll();
}
}
}
public void countUp() {
synchronized (synchObj) {
count++;
}
}
}
/**
* Default constructor for a simple fixed threadpool
*/
public PausableScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize);
}
/**
* Executed before a task is assigned to a thread.
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
pauseLock.lock();
try {
while (isPaused)
unpaused.await();
} catch (InterruptedException ie) {
t.interrupt();
} finally {
pauseLock.unlock();
}
activeTasksLatch.countUp();
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
try {
super.afterExecute(r, t);
}
finally {
activeTasksLatch.countDown();
}
}
/**
* Pause the threadpool. Running tasks will continue running, but new tasks
* will not start untill the threadpool is resumed.
*/
public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}
/**
* Wait for all active tasks to end.
*/
public boolean await(long timeoutMS) {
// assert isPaused;
try {
return activeTasksLatch.awaitZero(timeoutMS);
} catch (InterruptedException e) {
// log e, or rethrow maybe
}
return false;
}
/**
* Resume the threadpool.
*/
public void resume() {
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
}
}
问题在于Runnable/Callable本身需要检查何时暂停/恢复。话虽如此,有许多方法可以做到这一点,具体取决于您的要求。无论您选择哪种解决方案,都需要使等待可中断,以便线程可以被干净地关闭。
我知道这个问题早已过时,但我尝试了所有的答案,都无法满足我的需求,我需要一个可暂停计时器来执行任务,而它们在恢复后会一次性抛弃掉之前应该按计划执行的所有数据。
相反,我在GitHub上找到了这个Timer
类这里。对我来说它非常有效。
*我并没有编写这段代码,只是找到了它。