我实现了一个最小化的工作示例,基本上做了我认为你想要的事情。
一个任务接口(类似于可运行接口,只是通过传递上下文来执行等待)
package io.medev.stackoverflow;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
public interface Task {
static Task wrap(Runnable runnable) {
return wrap(runnable, Long.MAX_VALUE);
}
static Task wrap(Runnable runnable, long guessedExecutionTimeMillis) {
return new Task() {
@Override
public long guessExecutionTimeMillis() {
return guessedExecutionTimeMillis;
}
@Override
public void run(Context context) {
runnable.run();
}
};
}
long guessExecutionTimeMillis();
void run(Context context);
interface Context {
void idle(BooleanSupplier condition) throws InterruptedException;
void idle(long timeout, TimeUnit timeUnit) throws InterruptedException;
void idle(BooleanSupplier condition, long timeout, TimeUnit timeUnit) throws InterruptedException;
}
}
还有一个基本的固定线程池执行器 - 但你必须依赖于具体的实现:
package io.medev.stackoverflow;
import java.util.Comparator;
import java.util.concurrent.*;
import java.util.function.BooleanSupplier;
public class TimeEfficientExecutor implements Executor {
private final BlockingQueue<Task> taskQueue;
private final CountDownLatch latch;
private volatile boolean alive;
public TimeEfficientExecutor(int threads) {
this.taskQueue = new PriorityBlockingQueue<>(10, Comparator.comparingLong(Task::guessExecutionTimeMillis));
this.latch = new CountDownLatch(threads);
this.alive = true;
for (int i = 0; i < threads; i++) {
Thread thread = new Thread(new TimeEfficientExecutorRunnable());
thread.start();
}
}
@Override
public void execute(Runnable runnable) {
execute(Task.wrap(runnable));
}
public void execute(Runnable runnable, long guessedExecutionTimeMillis) {
execute(Task.wrap(runnable, guessedExecutionTimeMillis));
}
public void execute(Task task) {
this.taskQueue.offer(task);
}
public void shutdown() {
this.alive = false;
}
public void awaitShutdown() throws InterruptedException {
this.latch.await();
}
public void awaitShutdown(long timeout, TimeUnit timeUnit) throws InterruptedException {
this.latch.await(timeout, timeUnit);
}
private class TimeEfficientExecutorRunnable implements Runnable {
@Override
public void run() {
try {
while (TimeEfficientExecutor.this.alive) {
Task task = TimeEfficientExecutor.this.taskQueue.poll();
if (task != null) {
try {
task.run(new IdleTaskContext());
} catch (Exception e) {
}
}
}
} finally {
TimeEfficientExecutor.this.latch.countDown();
}
}
}
private class IdleTaskContext implements Task.Context {
@Override
public void idle(BooleanSupplier condition) throws InterruptedException {
idle(condition, Long.MAX_VALUE);
}
@Override
public void idle(long timeout, TimeUnit timeUnit) throws InterruptedException {
idle(() -> false, timeout, timeUnit);
}
@Override
public void idle(BooleanSupplier condition, long timeout, TimeUnit timeUnit) throws InterruptedException {
idle(condition, System.currentTimeMillis() + timeUnit.toMillis(timeout));
}
private void idle(BooleanSupplier condition, long idleUntilTs) throws InterruptedException {
long leftMillis = idleUntilTs - System.currentTimeMillis();
while (TimeEfficientExecutor.this.alive && !condition.getAsBoolean() && leftMillis >= 1L) {
Task task = TimeEfficientExecutor.this.taskQueue.poll(leftMillis, TimeUnit.MILLISECONDS);
leftMillis = idleUntilTs - System.currentTimeMillis();
if (task != null) {
if (leftMillis >= 1L && task.guessExecutionTimeMillis() < leftMillis) {
task.run(new IdleTaskContext());
} else {
TimeEfficientExecutor.this.taskQueue.offer(task);
}
}
}
}
}
}
请注意,您不能仅仅通过步进堆栈来控制程序的执行流程 - 堆栈是绑定在执行线程上的。这意味着,如果某个“子”任务开始空闲,就不可能跳回到底层的空闲任务中。您必须“信任”每个任务在
guessExecutionTimeMillis
方法中返回的结果。
由于执行器中使用了优先队列,因此队列总是返回执行时间最短的任务。感谢优先队列的使用。