如何在Java 5中使用ExecutorService实现任务优先级?

47
我正在实现一个线程池机制,希望能够执行具有不同优先级的任务。我想要一种好的机制,通过它我可以将高优先级任务提交到服务中,并在其他任务之前调度它。任务的优先级是任务本身固有的属性(无论我将该任务表示为Callable还是Runnable对我来说都不重要)。
表面上看,我可以在ThreadPoolExecutor中使用PriorityBlockingQueue作为任务队列,但是该队列包含Runnable对象,这些对象可能是我提交给它的Runnable任务,也可能不是。而且,如果我提交了Callable任务,则不清楚如何映射它们。
有没有办法做到这一点?我真的不想自己开发它,因为那样会更容易出错。
(另外;是的,我知道这样做可能会导致低优先级工作饥饿。有合理公平性保证的解决方案可以获得额外的分数(?!)。)

3
很有趣的问题。在我看来,这似乎是API中的一个小疏忽。 - Adam Jaskiewicz
如果我必须猜测为什么它不是API的一部分,我会说这可能是因为饥饿问题比较棘手。他们需要提供一个新的公平和升级原语集;例如must-execute-by和may-be-indefinitely-deferred(请注意,我是凭空想象这些名称的)。我可能希望他们这样做了,但我不怪他们 :) - Chris R
是的,那很有道理。虽然这似乎是一个不错的东西,但当你认为你需要在Java中实质上编写CPU调度算法时,你可能正在做一些错误的事情。 - Adam Jaskiewicz
6
JDK 6通过在AbstractExecutorService中提供新的newTaskFor方法来纠正这个遗漏,让您可以控制提交任务的包装方式,从而让您返回可比较的实例,并且可以轻松地通过自定义优先级队列进行排序。 - gsteff
6个回答

16

我已以合理的方式解决了这个问题,并将在下面描述它,以供我自己和遇到Java并发库中此问题的任何人参考。

使用PriorityBlockingQueue作为保存任务以便稍后执行的手段确实是朝着正确方向迈出的一步。但问题在于,PriorityBlockingQueue必须泛型实例化为包含Runnable实例,而无法对Runnable接口调用compareTo(或类似方法)。

解决问题的关键是,创建Executor时必须给它一个PriorityBlockingQueue,并且进一步给队列提供一个自定义的比较器来进行适当的就地排序:

new PriorityBlockingQueue<Runnable>(size, new CustomTaskComparator());

现在,让我们来看一下CustomTaskComparator

public class CustomTaskComparator implements Comparator<MyType> {

    @Override
    public int compare(MyType first, MyType second) {
         return comparison;
    }

}

到目前为止,一切看起来都很简单明了。但是接下来就有点棘手了。我们下一个问题是如何处理从Executor创建FutureTasks的问题。在Executor中,我们必须像这样重写newTaskFor

@Override
protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
    //Override the default FutureTask creation and retrofit it with
    //a custom task. This is done so that prioritization can be accomplished.
    return new CustomFutureTask(c);
}

在这里,c 是我们要执行的 Callable 任务。现在,让我们来看一下 CustomFutureTask

public class CustomFutureTask extends FutureTask {

    private CustomTask task;

    public CustomFutureTask(Callable callable) {
        super(callable);
        this.task = (CustomTask) callable;
    }

    public CustomTask getTask() {
        return task;
    }

}

注意getTask方法。我们稍后会使用它从我们创建的CustomFutureTask中获取原始任务。

最后,让我们修改我们试图执行的原始任务:

public class CustomTask implements Callable<MyType>, Comparable<CustomTask> {

    private final MyType myType;

    public CustomTask(MyType myType) {
        this.myType = myType;
    }

    @Override
    public MyType call() {
        //Do some things, return something for FutureTask implementation of `call`.
        return myType;
    }

    @Override
    public int compareTo(MyType task2) {
        return new CustomTaskComparator().compare(this.myType, task2.myType);
    }

}

你可以看到我们在任务中实现了Comparable,以委托给实际的MyType Comparator

这样就完成了使用Java库定制执行器的优先级!需要一些技巧,但这是我能够想出的最简洁的方法。希望对某人有所帮助!


1
这种机制存在一些固有的限制。例如,传递给执行器的第一个可运行/可调用对象不会进入队列。因此,优先级机制仅在任务排队时才会应用,而当当前的运行线程数量超过池大小(此处为1)时就会发生这种情况。 - Snicolas
1
在CustomTask中,您不应为每个比较实例化一个对象,这会使事情变得非常缓慢。 - Snicolas
getTask被用在哪里? - Mingjiang Shi

8
乍一看,似乎可以为任务定义一个接口,该接口扩展了RunnableCallable<T>Comparable。然后,使用PriorityBlockingQueue作为队列包装ThreadPoolExecutor,并仅接受实现您的接口的任务。
考虑到您的评论,看起来有一种选择是扩展ThreadPoolExecutor,并覆盖submit()方法。请参阅AbstractExecutorService以查看默认方法的外观;它们所做的只是将RunnableCallable包装在FutureTask中并执行它们。我可能会编写一个包装器类来实现ExecutorService并委托给匿名内部ThreadPoolExecutor。将它们包装在具有优先级的东西中,以便您的Comparator可以访问它。

2
这也是我的看法,但问题在于; 传递到优先队列中的“Runnable”实例并不是我直接“submit”的任务,它们被包装在一个java.util.concurrent.FutureTask<V>中,而这当然不能按相同的方式排序。如果我使用execute-例如不接受Callable-那么它会将我的对象抛出来。 - Chris R
我得说,我还在苦苦挣扎,但是...... 嗯,可以说这真是一件痛苦的事情 :) - Chris R
Adam,看起来你就坐在我身后,因为这就是我刚刚做的事情。现在值得注意的是,这并不能解决饥饿问题。我不太确定如何解决这个问题,但这是我必须考虑的事情。 - Chris R
你知道伟大思想家的说法吧?饥饿问题很复杂。我能想到的唯一办法,而且可能会变得非常混乱,就是使用某种老化算法,不时地增加所有任务的优先级,并在某种方式下更新队列。不幸的是,那实现起来说起来容易做起来难。 - Adam Jaskiewicz
1
逻辑是如果你增加所有任务的优先级,然后对队列进行堆化(heapify),你并没有改变现有任务之间的优先级关系,但是在队列中已经存在的任务会比新增加的任务具有更高的优先级。不过,我认为要以无锁的方式实现这个功能并不容易。 - Adam Jaskiewicz
显示剩余2条评论

4

我将尝试通过完全的功能代码来解释这个问题。但在深入代码之前,我想先介绍一下PriorityBlockingQueue。

PriorityBlockingQueue:PriorityBlockingQueue是一个BlockingQueue的实现。它接受具有优先级的任务并提交具有最高优先级的任务首先执行。如果两个任务具有相同的优先级,则需要提供一些自定义逻辑来决定哪个任务先执行。

现在让我们直接进入代码。

Driver类:这个类创建了一个executor,接受任务并将它们提交给执行器执行。这里我们创建了两个任务,一个具有LOW优先级,另一个具有HIGH优先级。我们告诉执行器运行最多1个线程,并使用PriorityBlockingQueue。

     public static void main(String[] args) {

       /*
       Minimum number of threads that must be running : 0
       Maximium number of threads that can be created : 1
       If a thread is idle, then the minimum time to keep it alive : 1000
       Which queue to use : PriorityBlockingQueue
       */
    PriorityBlockingQueue queue = new PriorityBlockingQueue();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
        1000, TimeUnit.MILLISECONDS,queue);


    MyTask task = new MyTask(Priority.LOW,"Low");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.HIGH,"High");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.MEDIUM,"Medium");
    executor.execute(new MyFutureTask(task));

}

MyTask类:MyTask实现了Runnable接口,并在构造函数中接受优先级作为参数。当此任务运行时,它会打印一条消息,然后将线程休眠1秒。

   public class MyTask implements Runnable {

  public int getPriority() {
    return priority.getValue();
  }

  private Priority priority;

  public String getName() {
    return name;
  }

  private String name;

  public MyTask(Priority priority,String name){
    this.priority = priority;
    this.name = name;
  }

  @Override
  public void run() {
    System.out.println("The following Runnable is getting executed "+getName());
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

MyFutureTask类:由于我们使用PriorityBlockingQueue来保存任务,因此我们的任务必须包装在FutureTask中,而我们实现的FutureTask必须实现Comparable接口。Comparable接口比较2个不同任务的优先级,并提交具有最高优先级的任务以供执行。

 public class MyFutureTask extends FutureTask<MyFutureTask>
      implements Comparable<MyFutureTask> {

    private  MyTask task = null;

    public  MyFutureTask(MyTask task){
      super(task,null);
      this.task = task;
    }

    @Override
    public int compareTo(MyFutureTask another) {
      return task.getPriority() - another.task.getPriority();
    }
  }

优先级类:优先级类是一个自解释的概念。

public enum Priority {

  HIGHEST(0),
  HIGH(1),
  MEDIUM(2),
  LOW(3),
  LOWEST(4);

  int value;

  Priority(int val) {
    this.value = val;
  }

  public int getValue(){
    return value;
  }


}

现在当我们运行这个示例时,会得到以下输出。
The following Runnable is getting executed High
The following Runnable is getting executed Medium
The following Runnable is getting executed Low

尽管我们先提交了 LOW 优先级的任务,但稍后又提交了 HIGH 优先级的任务,由于我们使用的是 PriorityBlockingQueue,任何具有更高优先级的任务都将首先执行。

创建High1、High2、Low1、Low2、Low3任务,并在优先级级别内随机执行它们。需要解决方案以保留相同优先级任务的提交顺序。 - Daniel Hári

4
你可以使用这些辅助类:
public class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get(timeout, unit);
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}

AND

public interface PriorityCallable<T> extends Callable<T> {

    int getPriority();

}

而且 这个辅助方法:

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
            return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
        }
    };
}

然后像这样使用AND

class LenthyJob implements PriorityCallable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor exec = getPriorityExecutor(2);

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}

@Snicolas,你所做的更改无法编译。 - assylias
@assyslias,你使用哪个JDK版本? - Snicolas
我能找到的最好的文章在这里:http://binkley.blogspot.fr/2009/04/jumping-work-queue-in-executor.html。在JDK 1.7-1.6和1.5上获得可行的东西确实很棘手。 - Snicolas

1

我的解决方案保留了相同优先级任务的提交顺序。这是对答案的改进。

任务执行顺序基于以下几点:

  1. 优先级
  2. 提交顺序(在相同优先级内部)

测试类:

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService executorService = PriorityExecutors.newFixedThreadPool(1);

        //Priority=0
        executorService.submit(newCallable("A1", 200));     //Defaults to priority=0 
        executorService.execute(newRunnable("A2", 200));    //Defaults to priority=0
        executorService.submit(PriorityCallable.of(newCallable("A3", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A4", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A5", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A6", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A7", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A8", 200), 0));

        //Priority=1
        executorService.submit(PriorityRunnable.of(newRunnable("B1", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B2", 200), 1));
        executorService.submit(PriorityCallable.of(newCallable("B3", 200), 1));
        executorService.execute(PriorityRunnable.of(newRunnable("B4", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B5", 200), 1));

        executorService.shutdown();

    }

    private static Runnable newRunnable(String name, int delay) {
        return new Runnable() {
            @Override
            public void run() {
                System.out.println(name);
                sleep(delay);
            }
        };
    }

    private static Callable<Integer> newCallable(String name, int delay) {
        return new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println(name);
                sleep(delay);
                return 10;
            }
        };
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

}

结果:

A1 B1 B2 B3 B4 B5 A2 A3 A4 A5 A6 A7 A8

第一个任务是A1,因为在插入队列时没有更高的优先级。B任务是1优先级,所以会先执行,A任务是0优先级,所以稍后执行,但执行顺序遵循提交顺序:B1、B2、B3、... A2、A3、A4...

解决方案:

public class PriorityExecutors {

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new PriorityExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS);
    }

    private static class PriorityExecutor extends ThreadPoolExecutor {
        private static final int DEFAULT_PRIORITY = 0;
        private static AtomicLong instanceCounter = new AtomicLong();

        @SuppressWarnings({"unchecked"})
        public PriorityExecutor(int corePoolSize, int maximumPoolSize,
                long keepAliveTime, TimeUnit unit) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
                    ComparableTask.comparatorByPriorityAndSequentialOrder()));
        }

        @Override
        public void execute(Runnable command) {
            // If this is ugly then delegator pattern needed
            if (command instanceof ComparableTask) //Already wrapped
                super.execute(command);
            else {
                super.execute(newComparableRunnableFor(command));
            }
        }

        private Runnable newComparableRunnableFor(Runnable runnable) {
            return new ComparableRunnable(ensurePriorityRunnable(runnable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new ComparableFutureTask<>(ensurePriorityCallable(callable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value);
        }

        private <T> PriorityCallable<T> ensurePriorityCallable(Callable<T> callable) {
            return (callable instanceof PriorityCallable) ? (PriorityCallable<T>) callable
                    : PriorityCallable.of(callable, DEFAULT_PRIORITY);
        }

        private PriorityRunnable ensurePriorityRunnable(Runnable runnable) {
            return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable
                    : PriorityRunnable.of(runnable, DEFAULT_PRIORITY);
        }

        private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask {
            private Long sequentialOrder = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;

            public ComparableFutureTask(PriorityCallable<T> priorityCallable) {
                super(priorityCallable);
                this.hasPriority = priorityCallable;
            }

            public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) {
                super(priorityRunnable, result);
                this.hasPriority = priorityRunnable;
            }

            @Override
            public long getInstanceCount() {
                return sequentialOrder;
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }
        }

        private static class ComparableRunnable implements Runnable, ComparableTask {
            private Long instanceCount = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;
            private Runnable runnable;

            public ComparableRunnable(PriorityRunnable priorityRunnable) {
                this.runnable = priorityRunnable;
                this.hasPriority = priorityRunnable;
            }

            @Override
            public void run() {
                runnable.run();
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }

            @Override
            public long getInstanceCount() {
                return instanceCount;
            }
        }

        private interface ComparableTask extends Runnable {
            int getPriority();

            long getInstanceCount();

            public static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() {
                return (o1, o2) -> {
                    int priorityResult = o2.getPriority() - o1.getPriority();
                    return priorityResult != 0 ? priorityResult
                            : (int) (o1.getInstanceCount() - o2.getInstanceCount());
                };
            }

        }

    }

    private static interface HasPriority {
        int getPriority();
    }

    public interface PriorityCallable<V> extends Callable<V>, HasPriority {

        public static <V> PriorityCallable<V> of(Callable<V> callable, int priority) {
            return new PriorityCallable<V>() {
                @Override
                public V call() throws Exception {
                    return callable.call();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

    public interface PriorityRunnable extends Runnable, HasPriority {

        public static PriorityRunnable of(Runnable runnable, int priority) {
            return new PriorityRunnable() {
                @Override
                public void run() {
                    runnable.run();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

}

更新:它与Guava的listeningDecorator无法一起使用.... :( - Daniel Hári

0

是否可以为每个优先级级别设置一个ThreadPoolExecutorThreadPoolExecutor可以使用ThreadFactory实例化,您可以拥有自己的ThreadFactory实现来设置不同的优先级级别。

 class MaxPriorityThreadFactory implements ThreadFactory {
     public Thread newThread(Runnable r) {
         Thread thread = new Thread(r);
         thread.setPriority(Thread.MAX_PRIORITY);
     }
 }

1
线程优先级在这里并不是很重要;任务本身往往会相对较快地执行(目标是使它们每个执行约50毫秒),因此线程调度不是很重要。问题在于任务之间的优先级相对关系。 - Chris R
它们必须按照特定的顺序执行吗? - willcodejavaforfood
没有一个真正的顺序,但是后来到达但优先级更高的任务应该比后来到达但优先级较低的任务先执行。再次强调需要一定的公平性保证,以防止饥饿问题。 - Chris R
将它们通过优先队列提交应该可以确保它们按正确顺序执行,所以我不太明白为什么你不能这样做。 - willcodejavaforfood
线程池执行器服务不支持任务优先级,这就是为什么你无法直接控制放置在队列中的对象类型。 - Chris R

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接