使用具有未知大小的队列的网络爬虫的生产者/消费者

5
我需要爬取父网页及其子网页,我遵循了http://www.albahari.com/threading/part4.aspx#%5FWait%5Fand%5FPulse中的生产者/消费者概念。同时,我使用了5个线程来对链接进行入队和出队操作。
如果队列的长度未知,您有什么建议可以让我在所有线程完成队列处理后结束/合并所有线程?
以下是我的编码思路。
static void Main(string[] args)
{
    //enqueue parent links here
    ...
    //then start crawling via threading
    ...
}

public void Crawl()
{
   //dequeue
   //get child links
   //enqueue child links
}
4个回答

3
如果所有线程都处于空闲状态(即等待队列),并且队列为空,则表示完成。
处理这种情况的简单方法是让线程在尝试访问队列时使用超时。类似于 BlockingCollection.TryTake。每当TryTake超时时,线程会更新一个字段以指示它已经空闲了多长时间:
while (!queue.TryTake(out item, 5000, token))
{
    if (token.IsCancellationRequested)
        break;
    // here, update idle counter
}

您可以设置一个每15秒执行一次的计时器,以检查所有线程的空闲计数器。如果所有线程都已经空闲了一段时间(比如一分钟),那么计时器就可以设置取消标记。这将杀死所有线程。您的主程序也可以监视取消标记。
顺便说一下,您可以在不使用BlockingCollection和取消的情况下完成此操作。您只需要创建自己的取消信号机制,如果您正在队列上使用锁,则可以将锁语法替换为Monitor.TryEnter等。
还有其他几种处理此问题的方法,但它们需要对您的程序进行重大重构。

1

你可以在最后加入一个虚拟令牌,当线程遇到这个令牌时退出。例如:

public void Crawl()
{
   int report = 0;
   while(true)
   {
       if(!(queue.Count == 0))      
       {   
          if(report > 0) Interlocked.Decrement(ref report);
          //dequeue     
          if(token == "TERMINATION")
             return;
          else
             //enqueue child links
       }
       else
       {              
          if(report == num_threads) // all threads have signaled empty queue
             queue.Enqueue("TERMINATION");
          else
             Interlocked.Increment(ref report); // this thread has found the queue empty
       }
    }
}

当然,我省略了enqueue/dequeue操作的锁。


我不认为那会解决问题。在排队虚拟令牌之前,你必须知道结束在哪里。 - Jim Mischel
@Jim Mischel:嗯,肯定有一种方法可以知道,比如没有更多的子链接需要处理。 - Tudor
我的观点是,他最初的问题本质上是“我怎么知道我到了结尾?”你的答案本质上是“当你到达结尾时,排队一个结束标记。” - Jim Mischel
假设您正在爬取有限数量的页面,那么您最终应该能够找出结束位置。如果您想在队列结束之前停止爬行,则您所问的问题实际上与该情况无关。 - Kiril
我已经拼凑出了一个不同的解决方案。请查看编辑。 - Tudor
显示剩余4条评论

0
线程可以通过引发事件或调用委托来发出已完成工作的信号。
static void Main(string[] args)
{
//enqueue parent links here
...
//then start crawling via threading
...
}

public void X()
{
    //block the threads until all of them are here
}

public void Crawl(Action x)
{
    //dequeue
    //get child links
    //enqueue child links
    //call x()
}

是的,因为子链接也可能是父链接,所以线程无法确切地知道它们的工作是否已经结束。 - user611333

0

如果你愿意使用任务并行库,那么就真的没有必要手动处理生产者-消费者问题。当你使用AttachToParent选项创建任务时,子任务将以这样一种方式与父任务链接,即在子任务完成之前,父任务不会完成。

class Program
{
    static void Main(string[] args)
    {
        var task = CrawlAsync("http://stackoverflow.com");
        task.Wait();
    }

    static Task CrawlAsync(string url)
    {
        return Task.Factory.StartNew(
            () =>
            {
                string[] children = ExtractChildren(url);
                foreach (string child in children)
                {
                    CrawlAsync(child);
                }
                ProcessUrl(url);
            }, TaskCreationOptions.AttachedToParent);
    }

    static string[] ExtractChildren(string root)
    {
      // Return all child urls here.
    }

    static void ProcessUrl(string url)
    {
      // Process the url here.
    }
}

您可以使用Parallel.ForEach来删除一些显式的任务创建逻辑。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接