如何保证一定数量的线程始终运行

7

好的,这是我的问题。我想要启动线程直到达到一定数量,比如100个。所以它会开始启动线程并持续检查正在运行的线程数量。当最大值达到时,它将停止启动新线程。但是使用适当的检查间隔或完成线程信号后,它将启动新线程。

通过这种方式,我将始终拥有特定数量的运行中的线程。

我使用sleep和permanent while来实现这一点。因此,我会在给定的间隔内不断检查总运行线程数,如果线程已经完成,则丢弃它并启动一个新线程。

但是我的解决方案不被视为正确的方式。我认为如果完成的线程发出信号,然后检查器会在我们低于最大线程阈值时启动新线程,那么会更好。

我看过许多线程池示例,但其中大部分都没有包含任何排队池,并带有最大数量的运行线程。我的意思是,它们只是一直启动线程,直到完成为止。但是,假设我有50万个要收集的URL。我不能只使用线程池中的for循环启动所有线程。

平台是C#4.5 WPF应用程序。

以下是我的解决方案。实际上,我正在寻找更好的解决方案,而不是改进这个。

private void Button_Click_4(object sender, RoutedEventArgs e)
{
    Task.Factory.StartNew(() =>
    {
        startCrawler();
    });
}

void startCrawler()
{
    int irMaximumThreadcount = 100;
    List<Task> lstStartedThreads = new List<Task>();
    while (true)
    {
        for (int i = 0; i < lstStartedThreads.Count; i++)
        {
            if (lstStartedThreads[i].IsCompleted == true)
            {
                lstStartedThreads[i].Dispose();
                lstStartedThreads.RemoveAt(i);
            }
        }

        if (lstStartedThreads.Count < irMaximumThreadcount)
        {
            var vrTask = Task.Factory.StartNew(() =>
            {
                func_myTask();
            });
            lstStartedThreads.Add(vrTask);
        }

        System.Threading.Thread.Sleep(50);
    }
}

void func_myTask()
{

}

“我不能只用线程池的for循环启动它们。” - 你真的尝试过吗?假设通过启动多个线程来加快整体互联网连接速度并不是“正确的方式”。此外,考虑使用异步操作-不需要那么多线程...除非你有像32核机器这样的东西... - Alexei Levenkov
5个回答

6

个人而言,我会使用PLINQ来处理这个问题,具体地说是使用WithDegreeOfParallelism方法,该方法可以限制并行执行的数量。

private IEnumerable<Action> InfiniteFunctions()
{
    while(true)
    {
        yield return func_myTask;
    }
}

private void Button_Click_4(object sender, RoutedEventArgs e)
{
    int irMaximumThreadcount = 100;
    InfiniteFunctions()
        .AsParallel()
        .WithDegreeOfParallelism(irMaximumThreadcount)
        .ForAll(f => f());
}

编辑:实际阅读文档后,似乎irMaximumThreadCount只能最多为64,因此请注意。

编辑2:好的,仔细看了一下,似乎Parallel.ForEach需要一个ParallelOptions参数,其中包括一个没有限制的MaxDegreeOfParallelism属性-查看它。所以你的代码可能是这样的:

private void CrawlWebsite(string url)
{
    //Implementation here
}

private void Button_Click_4(object sender, RoutedEventArgs e)
{
    var options = new ParallelOptions() 
    { 
        MaxDegreeOfParallelism = 2000 
    };

    Parallel.ForEach(massiveListOfUrls, options, CrawlWebsite);
}

现在这很有趣。你说这种方法可以用于爬取50万个页面的例子。让我试试 :) - Furkan Gözükara
哦,那对我没用 :) 我正在启动2000个线程来检查活动代理,例如^^即使任务管理器显示490个线程。我不知道为什么不是2000个 :) - Furkan Gözükara
注意我的编辑 - 并行最大只有64。是的,你可以循环遍历你的500,000个项目列表,并对每个项目执行该函数。 - Felix
是的...问题在于有2000个线程等待请求有点低效。让我再考虑一下这些额外信息来回答这个问题。 - Felix
是的,再次非常好的解决方案,但期望它在这种情况下不起作用。假设爬行失败了,我希望它重新尝试直到完成。由于这是foreach循环,我将无法重新爬行它。 - Furkan Gözükara
显示剩余4条评论

4
您混淆了任务和线程。任务不是线程。每个任务都有自己的线程没有保证
实际上TPL(任务并行库)是一种队列。这意味着您可以为每个FuncAction对象创建和启动任务。实际创建的线程数量很难控制
但是,您可以使用很少的开销创建许多任务,因为TPL会将它们排队,并应用进一步的逻辑来平衡线程池中的工作。
如果需要依次执行一些任务,可以使用Task.ContinueWith将它们排队。也可以使用Task.Factory.ContinueWhenAnyTask.Factory.ContinueWhenAll启动新任务。
这也是如何控制并行任务数量的线索:只需创建所需数量的任务,并使用ContinueWhenAny将其余任务排队。每次任务结束后都会启动下一个任务。
再次强调:TPL会在线程池中平衡工作负载。但您仍需考虑其他资源的使用,例如磁盘I/O或Internet连接。有很多试图同时使用相同资源的任务可能会严重减慢程序速度。

我有很多资源。每秒850 MB的I/O读写速度,50兆比特的光纤连接。无论如何,这是一些有用的信息,请点赞 :) - Furkan Gözükara

1

.NET 4.0引入了几个具有内置并发管理的集合,这对于这种情况应该是理想的。阻塞集合比在while循环中睡眠更有效率。然后,您只需生成x个线程从阻塞队列中读取。

BlockingCollection<string> queue = new BlockingCollection<string>(listOfUrls);

for (int x=0; x < MaxThreads; x++)
{
    Task.Factory.StartNew(() => 
    {
        while (true)
        {
            string url = queue.Take(); // blocks until url is available
            // process url;
        }
    }, TaskCreationOptions.LongRunning);
}

你可以将任务标记为长时间运行,这样它将创建自己的线程而不使用线程池。如果你需要先进先出,则可以将ConcurrentQueue<T>传递给阻塞集合构造函数。http://msdn.microsoft.com/en-us/library/dd287085.aspx

0

您可以自行管理任务/线程池,并等待任何线程完成并立即启动新线程。

MAX_THREAD_ALLOWED = 100;
List<Task> tasks = new List<Task>();
for (int i = 0; i < 1000; i++)
{
    tasks.Add(Task.Run(() => { Foo(i); }));
    if (i == MAX_THREAD_ALLOWED)
    {
        Task.WaitAny(tasks.ToArray());
        MAX_THREAD_ALLOWED++;
    }
}

0

虽然不是确切的答案,但我认为这可能会指引你朝着正确的方向。

首先,看一下Thread.Join,特别是在该页面底部给出的简单示例。这种方法优于Thread.Sleep(),更适合你的目的。我想到的是加入“manager”线程而不是休眠。

第二个选项可能适合你的目的,也可能不适合,那就是新的Tasks库。由于你正在使用框架的最新版本,因此可以选择此选项,但我猜测你无法控制Tasks库创建的实际线程数。它根据底层调度程序自动选择该值。但是,有一个名为ParallelOptions.MaxDegreeOfParallelism的选项听起来很有趣。


据我所知,线程加入(thread join)用于等待所有任务完成。如果我错了?如果是这样,我该怎么使用它?我不需要等待所有任务。当一个任务完成后,另一个任务将立即开始,因此始终会有一定数量的任务正在运行。 - Furkan Gözükara
不是100%确定,但我认为Join只会停止调用线程。另一个想法是加入新创建的工作线程,以便它们在当前运行的线程之一发出完成信号后立即开始工作,这样管理器就不必一遍又一遍地检查了。 - dotNET
不行,因为线程是独立完成的。先启动的可能最后完成,或者最后启动的可能先完成。 - Furkan Gözükara

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接