ManualResetEvent没有等待线程池完成。

3

我有一份需要处理的批次列表,需要一直处理。
我想并行处理每个块(5个),当一个块完成后再移动到下一个块。
但由于某种原因,下面的代码不会等待当前块被处理完毕就继续执行。

while (true)
{
    foreach (string[] urlsArr in chunks)
    { 
        int i = 0;
        foreach (var url in urlsArr)
        {
            ThreadPool.QueueUserWorkItem(x =>
            {
                ProccessUrl(url, config, drivers[i]);
                _resetEvent.Set();
                i++;
            });
        }
        _resetEvent.WaitOne();// this is not really waiting.
    }
}

3
如果您想逐个处理,则不要并行地开始所有工作。只需在循环中逐个处理即可。 - usr
3个回答

1
这是一个带有Tasks(async/await)的版本。
while (true)
        {
            foreach (string[] urlsArr in chunks)
            {
                Task[] tasks = new Task[urlsArr.Length];
                for (int i = 0; i < urlsArr.Length; i++)
                {
                    var url = urlsArr[i];
                    var driver = drivers[i];
                    tasks[i] = Task.Run(() => { ProccessUrl(url, config, driver); });
                }

                await Task.WhenAll(tasks);
            }
        }

请注意,它还解决了原始代码中“i”变量的问题,该变量未以线程安全的方式增加(可以使用Interlocked.Increment解决)。
如果您的代码不是异步的,则可以在线程中等待任务完成(但这会阻塞)。
Task.WhenAll(tasks).Wait();

1
请看Semaphore或其精简版。Semaphore将允许您始终只运行5个线程。一旦任何一个正在运行的线程完成,它就可以接手新的工作。这更有效率,特别是在工作负载不均匀的情况下。考虑一个项目需要一个小时处理,而其他4个只需要一秒钟的情况。在这种情况下,这4个线程将等待最后一个完成才能开始处理其他任务。
例如,请参见需要了解SemaphoreSlim的用法
在您的代码中,问题在于您只有一个等待句柄和5个线程。当其中任何一个5个正在运行的线程完成工作时,它将设置等待句柄,从而允许外部循环继续进行,启动另外5个线程。到目前为止,内部循环的前4个线程可能已经完成,并且其中任何一个都可以再次设置等待句柄!现在,您看到问题了吗?
根据Hans的评论,如果在一个批次中的工作项之间存在依赖关系,即必须在开始下一批之前完成所有工作项,则应查看CountDownEvent

Semaphore/Slim会给他现在完全相同的问题。你需要一个反向信号量,即当计数达到零时应该被触发。CountDownEvent很适合这个问题。 - Hans Passant

0

我认为你可以简化整个过程,并利用Parallel.ForEach()来管理线程并将并发度限制在5个。

如果您运行以下示例代码,您将看到虚拟URL正在以5个一组的方式处理,因为并发线程数被限制为5个。

如果您这样做,您将不需要自己的分块逻辑:

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication2
{
    class Program
    {
        static void Main()
        {
            // Make some pretend URLs for this demo.

            string[] urls = Enumerable.Range(1, 100).Select(n => n.ToString()).ToArray();

            // Use Parallel.ForEach() along with MaxDegreeOfParallelism = 5 to process
            // these using 5 threads maximum:

            Parallel.ForEach(
                urls,
                new ParallelOptions{MaxDegreeOfParallelism = 5},
                processUrl
            );
        }

        static void processUrl(string url)
        {
            Console.WriteLine("Processing " + url);
            Thread.Sleep(1000);
            Console.WriteLine("Processed " + url);
        }
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接