Java Executors:我该如何设置任务优先级?

44

是否有可能为执行器(Executors)执行的任务设置优先级?我在JCIP中找到了一些关于这个问题的说法,它可能是可行的,但我找不到任何例子,并且文档中也没有相关内容。

来自JCIP:

执行策略指定任务执行的“何时、何地、如何以及如何”等内容,包括:

  • ...
  • 任务应按什么顺序执行(FIFO、LIFO、优先级顺序)?
  • ...

更新:我意识到我所问的并不完全是我想问的。我真正想知道的是:

如何使用/模拟使用执行器框架(executors framework)设置线程优先级(即类似于thread.setPriority())。

8个回答

76

5
+1 优先级阻塞队列是可行的方法。您可以实现 Comparator 或使任务本身 Comparable。 - Tim Bender
4
这篇文章是一个很好的参考资料:http://binkley.blogspot.fr/2009/04/jumping-work-queue-in-executor.html - Snicolas
我的解决方案可以按优先级对任务进行排序,但同时保留相同优先级水平的提交顺序:https://dev59.com/d3RA5IYBdhLWcg3w1BqW#42831172 - Daniel Hári

51

这里的想法是在执行器中使用PriorityBlockingQueue。为此:

  • 创建一个比较器来比较我们的futures。
  • 创建一个Future代理来保存优先级。
  • 重写'newTaskFor'以便将每个future都包装在我们的代理中。

首先,您需要在您的future上保持优先级:

    class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get();
    }

    public void run() {
        src.run();
    }
}

接下来您需要定义一个比较器,以正确排序优先级未来:

class PriorityFutureComparator implements Comparator<Runnable> {
    public int compare(Runnable o1, Runnable o2) {
        if (o1 == null && o2 == null)
            return 0;
        else if (o1 == null)
            return -1;
        else if (o2 == null)
            return 1;
        else {
            int p1 = ((PriorityFuture<?>) o1).getPriority();
            int p2 = ((PriorityFuture<?>) o2).getPriority();

            return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
        }
    }
}

接下来,假设我们有一个类似于这样的长时间任务:

class LenthyJob implements Callable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

然后为了按优先级执行这些作业,代码将如下所示:
public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        int nThreads = 2;
        int qInitialSize = 10;

        ExecutorService exec = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
                new PriorityBlockingQueue<Runnable>(qInitialSize, new PriorityFutureComparator())) {

            protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
                RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
                return new PriorityFuture<T>(newTaskFor, ((LenthyJob) callable).getPriority());
            }
        };

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}

这是很多代码,但这几乎是实现它的唯一方法。
在我的机器上输出如下:
Scheduling: 39
Scheduling: 90
Scheduling: 88
Executing: 39
Scheduling: 75
Executing: 90
Scheduling: 15
Scheduling: 2
Scheduling: 5
Scheduling: 24
Scheduling: 82
Scheduling: 81
Scheduling: 3
Scheduling: 23
Scheduling: 7
Scheduling: 40
Scheduling: 77
Scheduling: 49
Scheduling: 34
Scheduling: 22
Scheduling: 97
Scheduling: 33
Executing: 2
Executing: 3
Executing: 5
Executing: 7
Executing: 15
Executing: 22
Executing: 23
Executing: 24
Executing: 33
Executing: 34
Executing: 40
Executing: 49
Executing: 75
Executing: 77
Executing: 81
Executing: 82
Executing: 88
Executing: 97

2
虽然“被采纳的答案”确实回答了问题,但这个提供了一个可行的解决方案。非常感谢。 - m02ph3u5
谢谢您的回答。使用ExecutorCompletionService是否可行?我尝试在ExecutorCompletionService构造函数中传入ExecutorService对象,但是无法将结果强制转换为比较器中的PriorityFuture。 - Arash
我在我的电脑上进行了测试。结果不正确。在我的电脑上,我得到了 "执行72" 在 "执行3" 之前的结果,这显然是错误的。 - Farhan stands with Palestine
我能找到的最干净的Callable Jobs解决方案,谢谢。 - Raghu
你好,我在使用上述代码时遇到了类型转换错误...int p1 = ((PriorityFuture<?>) o1).getPriority();java.util.concurrent.FutureTask 无法转换为 com.i2c.loyalty.handlers.datasynchandler.utils.InstancePriorityFuture这个 InstancePriorityFuture 实现了 RunnableFuture,请问有人可以帮忙吗? - Adnan Amman Ullah

4
您可以实现自己的ThreadFactory并将其设置在ThreadPoolExecutor中,如下所示:
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, numOfWorkerThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
threadPool.setThreadFactory(new OpJobThreadFactory(Thread.NORM_PRIORITY-2));

我的OpJobThreadFactory如下所示:

public final static class OpJobThreadFactory implements ThreadFactory {
   private int priority;
   private boolean daemon;
   private final String namePrefix;
   private static final AtomicInteger poolNumber = new AtomicInteger(1);
   private final AtomicInteger threadNumber = new AtomicInteger(1);

   public OpJobThreadFactory(int priority) {
      this(priority, true);
   }

   public OpJobThreadFactory(int priority, boolean daemon) {
      this.priority = priority;
      this.daemon = daemon;
      namePrefix = "jobpool-" +poolNumber.getAndIncrement() + "-thread-";
   }

   @Override
   public Thread newThread(Runnable r) {
      Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
      t.setDaemon(daemon);
      t.setPriority(priority);
      return t;
   }
}

3

3
你可以在 ThreadPoolExecutor 构造函数(或 Executors 工厂方法)中指定一个 ThreadFactory。这样就可以为执行器提供给定线程优先级的线程。
要为不同的作业获得不同的线程优先级,需要将它们发送到具有不同线程工厂的执行器中。

1

2
注释是注释,答案是答案。注释不是答案,答案也不是注释。如果它没有回答被问的问题,那么它实际上是一条注释。 - Engineer
1
+1 @Nick - 哈哈,太棒了!为什么用一个词,当你可以用一个冗长、讽刺的评论呢?好观点,说得好(有点儿厚颜无耻)。 - TedTrippin

0

只想为这个讨论贡献一点我的想法。我已经实现了ReorderingThreadPoolExecutor来达到一个非常具体的目的,那就是能够在不处理优先级(可能导致死锁并且不稳定)的情况下,随时将执行器的阻塞队列(在此情况下为LinkedBlockingDeque)中的任务明确地置顶。

我在Android应用程序中使用它来管理需要下载许多在长列表视图中显示的图片的情况。每当用户快速向下滚动时,执行器队列就会充斥着图片下载请求:通过将最新的请求移动到队列的顶部,我已经实现了更好的性能,在加载实际上在屏幕上显示的图像时,延迟下载可能稍后需要的图像。请注意,我使用内部并发映射键(可以简单地表示为图片URL字符串)将任务添加到执行器中,以便稍后可以检索它们进行重新排序。

实际上完成同样的任务还有很多其他方法,可能有点复杂,但它能正常工作,而且Facebook在其Android SDK中也类似地在其独立的工作线程队列中执行操作。

请随意查看代码并给予建议,它位于一个Android项目中,但去除一些日志和注释后,该类就是纯Java 6。


感谢您发现了这个问题,@Robert。我已经更新了链接到正确的URL。 - fast3r

0

如果只是想优先考虑一个线程而不是保证任何顺序,那么可以在Callable中传递set Thread priority,在call()方法开始时设置:

private int priority;

MyCallable(int priority){
this.priority=priority;
}

public String call() {

     logger.info("running callable with priority {}", priority);
     Thread.currentThread().setPriority(priority);

// do stuff

     return "something";
}

尽管如此,仍然依赖于底层实现来遵守线程优先级。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接