有没有一种方法可以将ScheduledExecutorService与ExecutorCompletionService一起使用?

6
我正在尝试同时使用ExecutorCompletionService和ScheduledExecutorService。我需要做的是安排不同的活动,每个活动都有“执行前的延迟”并且根据上一次运行的结果“重新安排它们”(拥有不同的延迟时间)。
我的问题是,我无法在ExcecutorCompletionService submit中使用“延迟”。
我尝试了以下方法,但它会永远阻塞...
显然我忽略了Java语言中的一个基本问题。
是否有任何方式可以安排任务到ScheduledExecutorService,使CompletionService“知道它”?
public class Bar {

    private ScheduledExecutorService scheduledExecutor;
    private Future<Status> action1Future;
    private Future<Status> action2Future;
    private ExecutorCompletionService<Status> pool;
    private long delay1 = 10;
    private long delay2 = 20;
    private long delay3 = 30;

    public void start() {
        scheduledExecutor = Executors.newScheduledThreadPool(3);

        Action1 a1 = new ActionOne(); // Action1 implements Callable<Status>
        Action2 a2 = new ActionTwo(); // Action2 implements Callable<Status>

        pool = new ExecutorCompletionService<Status>(scheduledExecutor);
        action1Future = scheduledExecutor.schedule(a1, delay1, TimeUnit.SECONDS);
        action2Future = scheduledExecutor.schedule(a2, delay1, TimeUnit.SECONDS);
        monitorAndRestart();
    }

    private void monitorAndRestart() {
         boolean isDone=false;
        do {
        try {
            // THIS IS WHERE IT BLOCKS.
            Future<Status> processedItem = pool.get();
            if (processedItem == action1Future) {
                if (processedItem.get() == Status.GOOD) {
                    action1Future = scheduledExecutor.schedule(new ActionOne(), delay1, TimeUnit.SECONDS);
                } else {
                    action1Future = scheduledExecutor.schedule(new ActionOne(), delay2, TimeUnit.SECONDS);
                }
            } else if (processedItem == action2Future) {
                if (processedItem.get() == Status.GOOD) {
                    action1Future = scheduledExecutor.schedule(new ActionOne(), delay2, TimeUnit.SECONDS);
                } else {
                    action1Future = scheduledExecutor.schedule(new ActionOne(), delay3, TimeUnit.SECONDS);
                }
            }
        } catch (InterruptedException e) {
            isDone = true;
            // handle this.. shudown whatever
        }
        catch (ExecutionException e) {
            // handle this
        }
      } while (isDone == false);
    }

    public static void main(String[] args) {
        Bar myRunner = new Bar();
        myRunner.start();
    }
}

如果我在Callable中设置"delay",通过new ActionOne(delay)创建callable,并使用CompletionService.submit(..),它可以工作。
actionFuture1 = pool.submit(new ActionOne(delay1));
/////


public class ActionOne implements Callable<Status>(
   private final delay;
   public ActionOne(long dl) {
       delay=dl;
   }
   Status call() {
       try {
         Thread.sleep(delay * 1000); // seconds
          return doSomething()
       } catch (...) { //thread.sleep execptions}
   }
 }

所以我猜最终的问题是:使用ScheduledExecutorService相比于Thread.sleep(delay)的方式,有什么根本上更好的地方吗?

2个回答

1
我们在其中一个应用程序中进行了这种重新调度,我们采取了Ed Thomas建议的方式包装任务。(我们还通过传递返回任务下一次执行时间的迭代器来使重新调度变得非常灵活-允许我们使用许多不同的调度策略)。
另一个选项是子类化ScheduledThreadPoolExecutor并覆盖afterExecute。这可能比包装任务更清晰。

实际上,ExecutorCompletionService 没有做任何其他事情,只是包装了任务... - Holger
1
关于您的第二个选项,我只看到了 afterExecute(Runnable, Throwable)(为什么?!)。我猜想这个选项不能用于 Callable - Jakob

0

我认为CompletionService不会起作用,特别是当你尝试重新安排它时。你能否使用Guava库并切换到ListenableFutures?这将使您更容易自己构建。

对于另一种方法,请考虑让未来/任务自行重新安排。您甚至可以从原始任务的包装器中处理此操作(从您的代码很难确定最终目标)。

很难确定您需要什么样的模式,但也许您可以尝试类似以下的东西:

public static void scheduleRepeatedly(final Callable<Status> callable, final int someDelay, final ScheduledExecutorService executor) {
    executor.schedule(new Runnable() {
        @Override
        public void run() {
            Status status = Status.BAD;
            try {
                status = callable.call();
            } catch (Exception e) {
                // whatever logic here to determine if callable should be rescheduled and what the new delay is
                if (status == Status.BAD) {
                    scheduleRepeatedly(callable, someDelay * 2, executor);
                }
            }
        }
    }, someDelay, TimeUnit.SECONDS);
}

这正是我感兴趣的事情。我该如何让java.util.concurrent.Future<V>根据结果V重新安排自己 - JalapenoGremlin
谢谢。我认为这会让我朝着我想要做的方向前进。理想情况下,我真正想要的是 pool.submit(new MyCallable(), delay).. 这可能意味着我在做某些错误的事情。感谢您的帮助! - JalapenoGremlin

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接