如何设计这个多线程编程场景?

3
我正在尝试使用多线程遍历树形结构。问题在于,没有HTTP调用我们就无法知道树的结构(即,HTTP调用将为您提供节点的子项)。因此,我正在尝试使用多线程以增加我们可以进行的HTTP请求的吞吐量。
我不知道我们应该如何优雅地解决这个问题,因此我首先尝试在这里描述我的想法。
最初,我认为这将类似于我们通常编写的BFS,假设我们具有10的并发级别。
SemaphoreSlim semaphore = new SemaphoreSlim(10);

Task HTTPGet(Node node) {
  blah blah
  Q.push(childNodes);
}

while (!Q.isEmpty()) {
    Node node = Q.head();
    Q.pop();
    taskList.Add(Task.Start(() => HTTPGet(node));
}

这里的问题是:处理完第一个节点后,Q变为空,整个循环也会终止。因此我认为我们需要检查信号量的剩余计数。如果剩余计数不为10,则表示某些进程仍在运行,我们应该等待其完成。

while (!Q.isEmpty() || semaphore.Count != 10) {
    Node node = Q.head();
    Q.pop();
    taskList.Add(Task.Start(() => HTTPGet(node));
}

但是显然在第一个节点弹出后,Q仍然是空的,我们需要在while循环内部进行一些“等待”,以确保我们可以获取该节点。

while (!Q.isEmpty() || semaphore.Count != 10) {
    if (Q.isEmpty()) {
       Wait till Q becomes non empty
       or semaphore.Count == 10 again
    }
    Node node = Q.head();
    Q.pop();
    taskList.Add(Task.Start(() => HTTPGet(node));
}

但是这种方法变得非常丑陋,我相信有更好的解决方法。我试图用生产者-消费者模式来阐述它,但失败了(因为这次消费者也会启动更多的生产者)。

有没有更好的方法来阐述这个问题?


树状结构是否保证为非循环的,或者子节点是否可能指向父节点(或更高层级节点),就像普通网站一样? - Matt Jordan
@MattJordan:结构保证是无环的。 - derekhh
2个回答

1

虽然通过代码更容易解释,但请注意我没有尝试或测试过这个。这只是为了让您开始正确的路径。

class Program {

    static void Main(string[] args) {
        new Program();
    }

    Program() {
        Node root = new Node("root");
        root.Children = new Node[2];
        root.Children[0] = new Node("child0");
        root.Children[1] = new Node("child1");

        MultiThreadedBFS(root);

    }


    BlockingCollection<Node> Queue = new BlockingCollection<Node>(10); // Limit it to the number of threads

    Node[] HTTPGet(Node parentNode) {
        return parentNode.Children; //your logic to fetch nodes go here
    }

    volatile int ThreadCount;

    void MultiThreadedBFS(Node root) {
        Queue.Add(root);

        // we fetch each node's children on a separate thread. 
        // This means that when all nodes are fetched, there are 
        // no more threads left. That will be our exit criteria
        ThreadCount = 0;

        do {
            var node = FetchNextNode();
            if (node == null)
                break;

            ProcessNode(node);
        } while (true);

    }

    Node FetchNextNode() {
        Node node;
        while (!Queue.TryTake(out node, 100)) {
            if (ThreadCount == 0 && Queue.Count == 0)
                return null; // All nodes have been fetched now
        }

        return node;
    }

    void ProcessNode(Node node) {
        // you can use a threadpool or task here
        new Thread(() => {
            Thread.CurrentThread.Name = "ChildThread";

            ++ThreadCount;
            Debug.WriteLine("Retrieving children for Node: " + node);
            var children = HTTPGet(node);
            foreach (var child in children) {
                Debug.WriteLine("Adding node for further processing: " + node);
                while (!Queue.TryAdd(child, -1))
                    ;
            }

            --ThreadCount;

        }).Start();
    }

    // this is the actual node class that represents the Node on the tree
    [DebuggerDisplay("Name = {Name}")]
    class Node {
        public string Name;
        public Node[] Children = new Node[0];

        public Node(string name) {
            Name = name;
        }

        public override string ToString() {
            return Name;
        }
    }

}

编辑:

我现在已经更新了程序,修复了退出条件和一些其他错误

此外,尽管我在这里使用线程,但我认为这是一个完美的使用async/await的情况。我会让其他人回答如何使用async/await。


谢谢!我有一个快速的问题 - 假设我们已经到达了叶子节点,并且已经将叶子节点推送到队列中。现在正在处理叶节点的线程已经启动,因此循环再次运行到“while(!Queue.TryTake(out node,-1))”语句。由于它在叶子级别,因此不会将任何子项推送到队列中。因此,无论等待多长时间,循环都将在“TryTake”语句处卡住? - derekhh
@derekhh,我刚刚更新了代码以修复你指出的问题,还修复了其他退出条件的错误。 - Vikhram

0

在从队列获取之前,让每个线程增加一个正在从队列中取出的线程计数器。如果该计数器恰好达到线程数,则表示所有线程都在尝试出队,不可能有任何操作正在进行。在这种情况下,通知所有线程退出。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接