并行执行图节点(任务)并找到关键任务

4
我使用Java实现了一个有向图。它用于项目计划,并且每个节点表示具有不同属性的任务。我已经成功地实现了拓扑排序,但我需要一种方法来运行/执行并行任务,只要任务的依赖关系完成即可。
这是我的实现:
import java.util.ArrayList;
import java.util.List;
import java.util.*;

public class Task implements Comparable<Task> {
    int number;
    String name;
    int time;
    int staff;
    int earliestStart, latestStart;
    List<Integer> dependencies;
    List<Task> outEdges;
    int cntPredecessors;
    Status status;
    public enum Status {UNVISITED,RUNNING,VISITED};
    @Override
    public String toString() {
        return "Task{" +
                "number=" + number +
                ", name='" + name + '\'' +
                ", time=" + time +
                ", staff=" + staff +
                ", dependencies=" + dependencies +
                '}';
    }

    public Task(int number, String name, int time, int staff) {
        setNumber(number);
        setName(name);
        setTime(time);
        setStaff(staff);
        dependencies=new ArrayList<>();
        outEdges=new ArrayList<>();
        status = Status.UNVISITED;
     }

    public int getNumber() {
        return number;
    }

    public void setNumber(int number) {
        this.number = number;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getTime() {
        return time;
    }

    public void setTime(int time) {
        this.time = time;
    }

    public int getStaff() {
        return staff;
    }

    public void setStaff(int staff) {
        this.staff = staff;
    }

    public List<Integer> getDependencies() {
        return dependencies;
    }

    public void setDependencies(List<Integer> dependencies) {
        this.dependencies = dependencies;
    }

    public List<Task> getOutEdges() {return outEdges; }

    public void setOutEdge(Task t) {outEdges.add(t); }



    public int getIndegrees() { return cntPredecessors; }

    public void setIndegree() { cntPredecessors = dependencies.size();}

    public Status getStatus() {return this.status; }

    public Task findTaskWithNoInDegree() {
            if (this.cntPredecessors == 0) return this;
            return null;
    }
    

    public int compareTo(Task other) {

        return Integer.compare(this.time, other.time);

        }
} //END class Task

// The class Main represents the Task objects in a graph

import java.util.*;

public class Main {
    static int maxnr = 0;

    public static void main(String[] args) {
        Map<Integer, Task> map=new HashMap<>();
        Scanner scanner = new Scanner(Main.class.getResourceAsStream("/res/house.txt"), "UTF-8");
        
        Main mainObject = new Main();
        map = mainObject.fromScanner(scanner);
        System.out.println("DEBUG: maxnr " + maxnr);
        mainObject.setInDegrees(map);
        mainObject.setOutEdges(map);

        //System.out.println("DEBUG: Size of outEdges for Task 1 is : " + map.get(1).getOutEdges().size());
        //System.out.println("DEBUG: Indegrees for Task 8 is : " + map.get(8).getIndegrees());
        mainObject.topSort(maxnr,map);

        for(Integer k:map.keySet()) {
            //System.out.println("DEBUG outEdges for Task number " + map.get(k).getNumber() + " " + map.get(k).getOutEdges());
        }

    } // END of void main(String[] args)


    public void setInDegrees(Map<Integer, Task> map) {
        for(Integer k:map.keySet()) {
            Task task = map.get(k);
            task.setIndegree();
        }
    }


    public void setOutEdges(Map<Integer, Task> map) {
        for(Integer k:map.keySet()) {
            // map.get(k).setIndegrees();
            for(Integer dep:map.get(k).getDependencies()) {
                //System.out.println("DEBUG: "+ dep);
                //System.out.print(" DEBUG:  Name is "  + map.get(dep).getName());
                map.get(dep).setOutEdge(map.get(k));
            }
            //System.out.println(map.get(k));
        }
    } //END of setOutEdges()
        // toplogical sort # Big O(|V| +|E|)  for indegree calc and since the code only looks at each edge once!
        // S is Set of all nodes with no incoming edges
    public void topSort(int maxnr, Map<Integer, Task> map) {
        ArrayList<Task> L = new ArrayList<Task>(maxnr);
        //LinkedList<Task> L = new LinkedList<>();
        //HashSet<Task> S = new HashSet<>(maxnr);
        LinkedList<Task> S = new LinkedList<>();

        for(Integer n:map.keySet()) {
            if(map.get(n).getIndegrees() == 0) {
                S.add(map.get(n));
            }
        }
        System.out.println("DEBUG: Set S is " + S);
        //HashSet<Task> S2 = new HashSet<>(S);

        Task t;
        int counter= 0;
        while(!S.isEmpty()) {
            //System.out.print("Topsort: Task and S. " + t.getNumber());
            t = S.iterator().next();
            S.remove(t);
            //System.out.print("Topsort : " + t.getNumber());
            L.add(t);
            //System.out.println("Starting " + t.getNumber());
            counter++;

            for(Iterator<Task> it = t.outEdges.iterator(); it.hasNext();) {
                Task w =  it.next();
                w.cntPredecessors--;
                if (w.getIndegrees() == 0) {
                    S.add(w);
                   // System.out.println("Starting " + w.getNumber());
                }
            }
        }
        System.out.println();

        if (counter < maxnr) {
            System.out.println("Cycle detected, topsort not possible");
        } else {
            //System.out.println("Topsort : " + Arrays.toString(L.toArray()));
            Iterator<Task> topsortIt = L.iterator();
            System.out.print("\n Topsort list is: ");
            while (topsortIt.hasNext()) {
                System.out.print(" " + topsortIt.next().getNumber());
            }
            System.out.println();
        }
    } //END of topSort()

    public Map fromScanner(Scanner scanner) {
    Map<Integer, Task> map=new HashMap<>();
    maxnr = scanner.nextInt();
    while (scanner.hasNextLine()) {
        String line=scanner.nextLine();
        if (line.isEmpty() ) continue;
        Scanner s2=new Scanner(line);
        Task task = new Task(s2.nextInt(), s2.next(), s2.nextInt(), s2.nextInt());
        while (s2.hasNextInt()) {
            int i = s2.nextInt();
            if (i != 0) {
                task.getDependencies().add(i);
            }
        }
        map.put(task.getNumber(), task);
    }
    return map;
    } //END of fromScanner()

    } //END of class Main

house.txt文件内容:第一行(数字)表示最大节点/任务数量。列包括:任务编号、名称、完成所需时间、人力需求和依赖边缘(以0结束)。

8
1   Build-walls     4 2       5       0
2   Build-roofs     6 4       1       0
3   Put-on-wallpapers   1 2       1       2       0
4   Put-on-tiles        1 3       2       0
5   Build-foundation    4 2       0
6   Make-floor          2 2       5       0
7   Put-carpet-floor    4 2       6       2       0
8   Move-in         4 4       3       7       0

没有前置任务(即无依赖项)的任务应该首先启动。 例如,对于上面给定的输入,任务的执行可以如下打印:

Time: 0      Starting: 5   // Task 5 only one with no  dependencies
        Current staff: 2 

Time: 4      Finished: 5
             Starting: 6
             Starting: 1
        Current staff: 4   // sum of manpower from Task 6 and 1 => 2 +  2 = 4

Time: 6     Finished: 6
       Current staff: 2    

Time: 8     Finished: 1
            Finished: 1  
            Starting: 2
            Starting: 3 
       Current staff: 6

etc.


如果Task实现了Runnable接口,你可以使用ExecutorService来运行它。 - Fildor
Fildor:不幸的是,我必须手动实现算法,无法使用Runnable。 - Susinthiran
那么请您详细说明在您的情况下“执行任务”确切含义是什么。您是想计算每个任务的离散开始和结束时间,并找出关键路径吗? - Fildor
如果图中存在循环,topSort() 将检测到并给出一个消息,说明该项目无法实现。应尽快启动每个任务,即在完成所有依赖任务后立即启动。没有依赖关系的任务应立即启动。输出应通过打印重要信息来提供,即任务何时启动和/或完成。您的系统还应在这些时间点打印当前工作人员。对于我的任务执行,系统反馈应该是类似于任务执行顺序列表中给出的内容 :) - Susinthiran
这是关于根据节点时间和考虑依赖关系来优化遍历的问题 :) - Susinthiran
这个回答解决了你的问题吗?有向无环图任务的并行执行 - Anmol Singh Jaggi
3个回答

1

有几种方法。

您可以使用Java 8的CompletableFuture或Guava的ListenableFuture来实现它。

您的任务应该实现RunnableCallable。当您执行当前任务时,您应该递归地执行其preTasks(如果任务是组合模式,则它将知道它的前任),并确保它们完成(通过标志表达状态或其他方式)。

像这样:

CompletableFuture.allOf(predecessor).thenRunAsync(current)...
Futures.allAsList(predecessor)...

1
如果有人需要帮助,这正是JavaRed Library的使用案例-在定义为图形的流中安排和运行异步任务。
简而言之,像这样复杂的图形流程: Graph execution flow 可以通过以下方式简单实现:
Result<String> aResult = produceFutureOf(String.class).byExecuting(() -> executeA());
Result<String> bResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(a -> executeB(a));
Result<String> cResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(a -> executeC(a));
Result<String> eResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(a -> executeE(a));
Result<String> dResult = ifResults(bResult, cResult).succeed().produceFutureOf(String.class).byExecuting((b, c) -> executeD(b, c));
Result<String> fResult = ifResult(eResult).succeed().produceFutureOf(String.class).byExecuting(e -> executeF(e));
Result<String> gResult = ifResult(fResult).succeed().produceFutureOf(String.class).byExecuting(f -> executeG(f));
return ifResults(dResult, gResult).succeed().produceFutureOf(String.class).byExecuting((d, g) -> executeH(d, g));

更多相关信息请参见项目维基


0

要安排您的任务,您可以自己维护线程池,或者使用库来完成。如果您自己维护线程池,那么非常简单(请原谅我糟糕的伪代码):

assume you already have tasks topoligically sorted : t0,t1,t2,....,tn
int next_task=0;  //global pointer to next task

then in each thread, you do:
    while (true) {
        atomic {
            if (next_task > n) break;
            t = get_task (next_task);
            next_task = next_task + 1;
        }
        run task t;
    }

这种方式可以按照依赖关系的顺序执行所有任务,每个线程完成最后一个任务后就会立即转到下一个任务。

如果您想要一个库来为您安排时间表,也许可以考虑使用OpenMP、TBB或查看此线程 如何在Java中实现类似DAG的调度程序??

希望这有所帮助。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接