如何实现一个ExecutorService以轮换方式执行任务?

7

我正在使用java.util.concurrent.ExecutorService固定线程池来执行任务列表。我通常会有大约80-150个任务,我已经将同时运行线程的数量限制为10个,如下所示:

ExecutorService threadPoolService = Executors.newFixedThreadPool(10);

for ( Runnable task : myTasks ) 
{     
    threadPoolService.submit(task); 
}

我的使用情况要求即使已完成的任务也应该重新提交给 ExecutorService,但只有在所有已经提交的任务都被服务/完成时才会再次执行/获取。这基本上意味着已提交的任务应该按轮换方式执行。因此,在这种情况下不会有 threadPoolService.shutdown()threadPoolService.shutdownNow() 的调用。
我的问题是,如何实现按轮换方式服务于 ExecutorService 的任务?
4个回答

12

ThreadPoolExecutor提供了一个afterExecution的扩展点,您可以在其中将作业放回队列的末尾。

public class TaskRepeatingThreadPoolExecutor extends ThreadPoolExecutor {

    public TaskRepeatingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        this.submit(r);
    }
}

当然,如果没有ExecutorService的便捷工厂方法,您需要自己实例化它,但构造函数足够简单易懂。


感谢您的建议并让我知道了ThreadPoolExecutor类的存在。我在这里有一个问题,我需要在构造函数中传递BlockingQueue<Runnable> workQueue是什么/如何/为什么?在ExecutorService的情况下,我通常会像这样提交所有任务:threadPoolService.submit(task)。我无法理解关于BlockingQueue<Runnable> workQueue的内容。希望您能让我理解这个问题。 - Gnanam
刚刚了解了BlockingQueue<Runnable>,想在这里更新一下。BlockingQueue主要用于保存工作/任务,当池中的所有线程都忙于执行任务时,将其发送到执行器。 - Gnanam
我不确定这个技术是否有效,因为afterExecute是使用FutureTask(Runnable-实际任务的包装器)调用的,它具有内部状态,用于跟踪完成状态。如果重新提交,则只返回而不执行实际任务。 - Dhanaraj Durairaj

1
我建议使用标准库并发工具中完全存在的功能来解决问题。它使用一个带有任务装饰器类和屏障操作的循环屏障,重新提交所有任务。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Rotation {

    private static final class RotationDecorator implements Runnable {
        private final Runnable          task;
        private final CyclicBarrier barrier;


        RotationDecorator( Runnable task, CyclicBarrier barrier ) {
            this.task = task;
            this.barrier = barrier;
        }


        @Override
        public void run() {
            this.task.run();
            try {
                this.barrier.await();
            } catch(InterruptedException e) {
                ; // Consider better exception handling
            } catch(BrokenBarrierException e) {
                ; // Consider better exception handling
            }
        }
    }


    public void startRotation( List<Runnable> tasks ) {
        final ExecutorService threadPoolService = Executors.newFixedThreadPool( 10 );
        final List<Runnable> rotatingTasks = new ArrayList<Runnable>( tasks.size() );
        final CyclicBarrier barrier = new CyclicBarrier( tasks.size(), new Runnable() {
            @Override
            public void run() {
                Rotation.this.enqueueTasks( threadPoolService, rotatingTasks );
            }
        } );
        for(Runnable task : tasks) {
            rotatingTasks.add( new RotationDecorator( task, barrier ) );
        }
        this.enqueueTasks( threadPoolService, rotatingTasks );
    }


    private void enqueueTasks( ExecutorService service, List<Runnable> tasks ) {
        for(Runnable task : tasks) {
            service.submit( task );
        }
    }

}

1
你可以简单地检查所有任务是否已经执行并在这种情况下重新提交它们,例如像这样:
    List<Future> futures = new ArrayList<>();
    for (Runnable task : myTasks) {
        futures.add(threadPoolService.submit(task));
    }
    //wait until completion of all tasks
    for (Future f : futures) {
        f.get();
    }
    //restart
    ......

编辑
看起来您想在任务完成后立即重新提交任务。您可以使用ExecutorCompletionService,它使您能够在任务执行时检索任务,下面是一个简单的示例,其中包含2个任务,它们在完成后会被多次重新提交。样本输出:

任务1已提交到pool-1-thread-1
任务2已提交到pool-1-thread-2
任务1已完成pool-1-thread-1
任务1已提交到pool-1-thread-3
任务2已完成pool-1-thread-2
任务1已完成pool-1-thread-3
任务2已提交到pool-1-thread-4
任务1已提交到pool-1-thread-5
任务1已完成pool-1-thread-5
任务2已完成pool-1-thread-4

public class Test1 {

    public final ConcurrentMap<String, String> concurrentMap = new ConcurrentHashMap<>();
    public final AtomicInteger retries = new AtomicInteger();
    public final Object lock = new Object();

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        int count = 0;
        List<Runnable> myTasks = new ArrayList<>();
        myTasks.add(getRunnable(1));
        myTasks.add(getRunnable(2));
        ExecutorService threadPoolService = Executors.newFixedThreadPool(10);
        CompletionService<Runnable> ecs = new ExecutorCompletionService<Runnable>(threadPoolService);
        for (Runnable task : myTasks) {
            ecs.submit(task, task);
        }
        //wait until completion of all tasks
        while(count++ < 3) {
            Runnable task = ecs.take().get();
            ecs.submit(task, task);
        }
        threadPoolService.shutdown();
    }

    private static Runnable getRunnable(final int i) {
        return new Runnable() {

            @Override
            public void run() {
                System.out.println("Task " + i + " submitted " + Thread.currentThread().getName() + "  ");
                try {
                    Thread.sleep(500 * i);
                } catch (InterruptedException ex) {
                    System.out.println("Interrupted");
                }
                System.out.println("Task " + i + " completed " + Thread.currentThread().getName() + "  ");
            }
        };
    }
}

在我的情况下,我需要在每个任务完成时重新提交每个单独的任务,而不是“等待所有任务完成”。有任何想法/评论吗? - Gnanam
太棒了!这正是我在寻找的。谢谢你。 - dnim

1

答案更多地与用于ExecutorService实例的工作队列的实现相关。因此,我建议:

  1. 首先选择一个提供循环队列功能的{{link1:java.util.concurrent.BlockingQueue}}(示例)的实现。 注意,选择BlockingQueue的原因是要等待下一个任务被放入队列中;因此,在循环+阻塞队列的情况下,您应该注意如何提供相同的行为和功能。

  2. 不要使用Executors.new...创建新的ThreadPoolExecutor,而是使用直接构造函数,例如

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)

这样,除非您命令执行器进行shutdown,否则它将尝试从其工作队列中获取下一个任务以执行,该队列是一个用于任务的循环容器。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接