ConcurrentQueue and Parallel.ForEach

4
我有一个包含URL列表的ConcurrentQueue,我需要获取它们的源代码。当使用Parallel.ForEach和ConcurrentQueue对象作为输入参数时,Pop方法无法正常工作(应返回字符串)。
我正在使用设置了最大并行度为4的Parallel。我真的需要限制并发线程数。使用队列和并行处理是否多余?
提前感谢您的帮助。
// On the main class
var items = await engine.FetchPageWithNumberItems(result);
// Enqueue List of items
itemQueue.EnqueueList(items);
var crawl = Task.Run(() => { engine.CrawlItems(itemQueue); });

// On the Engine class
public void CrawlItems(ItemQueue itemQueue)
{
Parallel.ForEach(
            itemQueue,
            new ParallelOptions {MaxDegreeOfParallelism = 4},
            item =>
            {

                var worker = new Worker();
                // Pop doesn't return anything
                worker.Url = itemQueue.Pop();
                /* Some work */
             });
 }

// Item Queue
class ItemQueue : ConcurrentQueue<string>
    {
        private ConcurrentQueue<string> queue = new ConcurrentQueue<string>();

        public string Pop()
        {
            string value = String.Empty;
            if(this.queue.Count == 0)
                throw new Exception();
            this.queue.TryDequeue(out value);
            return value;
        }

        public void Push(string item)
        {
            this.queue.Enqueue(item);
        }

        public void EnqueueList(List<string> list)
        {
            list.ForEach(this.queue.Enqueue);
        }
    }

1
分享你的进展... - andrei.ciprian
1
ItemQueue 不应既派生自 ConcurrentQueue 也包含一个 ConcurrentQueue,请选其一。 - Scott Chamberlain
1
@Zroq:由于从URL下载源代码是一个I/O绑定的操作,我必须声明并行性是错误的工具。异步并发将使用更少的资源,并且同样快速。 - Stephen Cleary
@StephenCleary,使用SemaphoreSlim和async/await来限制活动线程数是一种有效的方法,还有其他更高效的方法吗? - Zroq
@Zroq:我会使用SemaphoreSlim来实现async - Stephen Cleary
2个回答

6
如果您只是从单个线程中首先向其中添加项目,然后在Parallel.ForEach()中迭代它,则不需要ConcurrentQueue。普通的List就足够了。
此外,您的ItemQueue实现非常可疑:
- 它继承自ConcurrentQueue并且还包含另一个ConcurrentQueue。这没有多大意义,而且令人困惑和低效。 - ConcurrentQueue上的方法被设计得非常小心以确保线程安全。而您的Pop()方法不是线程安全的。可能发生的情况是:您检查Count,注意到它是1,然后调用TryDequeue(),但由于另一个线程在两次调用之间从队列中删除了该项,因此不会得到任何值(即value将为null)。

0
问题出在CrawlItems方法上,因为你不应该在ForEach方法提供的操作中调用Pop。原因是该操作在每个弹出的项上被调用,因此该项已经被弹出。这就是为什么该操作有一个“item”参数的原因。
我假设你得到了null,因为所有的项都已经被其他线程通过ForEach方法弹出了。
因此,你的代码应该像这样:
public void CrawlItems(ItemQueue itemQueue)
{
    Parallel.ForEach(
        itemQueue,
        new ParallelOptions {MaxDegreeOfParallelism = 4},
        item =>
        {
            worker.Url = item;
            /* Some work */
         });
}

在执行 itemQueue.EnqueueList(items); 后,我已经确认队列确实有 41 个项目。 - Zroq
请问您能否分享ItemQueue的实现方式? - yonisha

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接