如何在Java中实现类似DAG的调度器?

5

我希望在Java中实现一个简单的DAG调度器(不需要结果),如下图所示:

DAG-like scheduling

我可以使用手动代码来实现这个功能:

ExecutorService executor = Executors.newCachedThreadPool();
Future<?> futureA = executor.submit(new Task("A"));
Future<?> futureC = executor.submit(new Task("C"));
futureA.get();
Future<?> futureB = executor.submit(new Task("B"));
futureB.get();
futureC.get();
Future<?> futureD = executor.submit(new Task("D"));
futureD.get();

但我正在寻求一种更通用的方法来做到这一点,以便我可以像这样使用调度器:

Container container = new Container();
container.addTask("A", new Task("A"));
container.addTask("B", new Task("B"), "A");
container.addTask("C", new Task("C"));
container.addTask("D", new Task("D"), "B", "C");
container.waitForCompletion();

实际上,我已经实现了一个简单的调度器:

https://github.com/jizhang/micro-scheduler/blob/master/src/main/java/com/shzhangji/micro_scheduler/App.java

但是,我需要每100毫秒迭代所有任务,以确定哪个任务可以提交。此外,这个实现没有异常检查。

我也尝试使用Guava库的ListenableFuture,但我不知道如何正确使用它。

如果您有关于如何实现DAG或推荐现有开源调度程序的建议,将不胜感激。


如果您只想并发运行任务 ABC,然后等待它们完成,为什么不使用 invokeAll 呢? - Boris the Spider
2
@BoristheSpider B 依赖于 A,且可能存在更复杂的图形。 - Jerry
那是一个非线性管道。查找管道模式,有几个Java库实现了它。 - tom
4个回答

7
你所需要的可以使用Google的Guava库及其可监听的Future接口来完成。ListenableFutures允许您拥有复杂的异步操作链。您应该实现一个可监听的Future,以便在任务B和C完成后使用allAsList方法执行任务D。
Listenable Futures的文档: https://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained Listenable Futures的教程: http://www.javacodegeeks.com/2013/02/listenablefuture-in-guava.html 使用allAsList、chain和transform方法的示例: http://codingjunkie.net/google-guava-futures/

1
感谢这些文章,我写了一个类似的:https://github.com/jizhang/micro-scheduler/blob/master/src/main/java/com/shzhangji/micro_scheduler/Container.java ,并且它可以正常工作。 - Jerry

6

Dexecutor(免责声明:我是所有者)是您寻找的库,这里是一个例子:

public class WorkFlowManager {

    private final Dexecutor<String, Boolean> dexecutor;

    public WorkFlowManager(ExecutorService executorService) {
        this.dexecutor = buildDexecutor(executorService);

        buildGraph();
    }

    private Dexecutor<String, Boolean> buildDexecutor(final ExecutorService executorService) {
        DexecutorConfig<String, Boolean> config = new DexecutorConfig<>(executorService, new WorkFlowTaskProvider());
        return new DefaultDexecutor<>(config);
    }

    private void buildGraph() {
        this.dexecutor.addDependency(TaskOne.NAME, TaskTwo.NAME);
        this.dexecutor.addDependency(TaskTwo.NAME, TaskThree.NAME);
        this.dexecutor.addDependency(TaskTwo.NAME, TaskFour.NAME);
        this.dexecutor.addDependency(TaskTwo.NAME, TaskFive.NAME);
        this.dexecutor.addDependency(TaskFive.NAME, TaskSix.NAME);
        this.dexecutor.addAsDependentOnAllLeafNodes(TaskSeven.NAME);
    }

    public void execute() {
        this.dexecutor.execute(ExecutionConfig.TERMINATING);
    }
}

将建立以下图表,然后执行将相应地进行。 Dexecutor Graph

有关更多详细信息,请参见如何我?

Dexecutor的原因

  • 超轻量级
  • 超快速
  • 支持立即/定时重试逻辑
  • 支持非终止行为
  • 有条件跳过任务执行
  • 良好的测试覆盖率,使您免受伤害
  • 可在Maven中央库中使用
  • 良好的文档
  • 分布式执行支持(Ignite、Hazelcast、Infinispan)

有用链接


0
我们可以使用Java8的CompletableFuture来解决您的问题,核心代码如下:
for (Task dependedentTask : dependedentTasks) {
    // get dependedentFuture by dependedentTask,for example
    CompletableFuture dependedentFuture = futureMap.get(dependedentTask);
    dependedentFutures.add(dependedentFuture);
}
CompletableFuture.allOf(dependedentFutures).thenRunAsync(current task execute it's method)...

希望有人能写这段代码。

0
另一个可能的方向是JavaRed Library,它提供了一个接口,用于编写和定义异步图流,并具有类似同步的语法。
例如,像这个复杂的图流: Graph execution flow 可以简单地实现为:
Result<String> aResult = produceFutureOf(String.class).byExecuting(() -> executeA());
Result<String> bResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(a -> executeB(a));
Result<String> cResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(a -> executeC(a));
Result<String> eResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(a -> executeE(a));
Result<String> dResult = ifResults(bResult, cResult).succeed().produceFutureOf(String.class).byExecuting((b, c) -> executeD(b, c));
Result<String> fResult = ifResult(eResult).succeed().produceFutureOf(String.class).byExecuting(e -> executeF(e));
Result<String> gResult = ifResult(fResult).succeed().produceFutureOf(String.class).byExecuting(f -> executeG(f));
return ifResults(dResult, gResult).succeed().produceFutureOf(String.class).byExecuting((d, g) -> executeH(d, g));

关于此更多的信息请访问项目维基


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接