限制线程数量

5
我有一个包含要下载项目的列表。我使用一个for循环来遍历这个列表。
对于列表中的每个项目,我都会启动一个新线程来引用该项目。我的问题是我想同时限制最大下载量。
for (int i = downloadList.Count - 1; i >= 0; i--)
{
    downloadItem item = downloadList[i];
    if (item.Status != 1 && item.Status != 2)
    {
        ThreadStart starter = delegate { this.DownloadItem(ref item); };
        Thread t = new Thread(starter);
        t.IsBackground = true;
        t.Name = item.Name;
        t.Priority = ThreadPriority.Normal;
        t.Start();
    }
}

我看到有关线程池的内容,但是我无法引用我的项目。有人能帮帮我吗?谢谢!:)

编辑:

我尝试了这个:

ThreadPool.SetMaxThreads(maxDownloads, maxDownloads);
ThreadPool.SetMinThreads(maxDownloads, maxDownloads);
ThreadPool.QueueUserWorkItem(DownloadItem, ref item);

我不知道如何在这个线程中引用我的downloadItem.....


一个由线程池服务的工作队列听起来是正确的选择 - 你能更具体地说明一下你在尝试使用它时遇到的问题吗? - pdbartlett
1
查看线程池的使用方法:http://msdn.microsoft.com/zh-cn/library/3dasc8as(VS.80).aspx 您的代码中没有使用它们。 - Cipi
你肯定会更好地学习如何使用线程池,而不是自己创建一堆短暂的线程。 - Will Dean
4个回答

8

如果您使用的是.NET 4,我强烈建议使用Parallel.ForEach(可能在downloadList.Reverse()上进行)。

因此,可以按照以下方式进行:

Parallel.ForEach(downloadList.Reverse(), 
                 new ParallelOptions { MaxDegreeOfParallelism = 8 },
                 item => this.DownloadItem(item));

如果您不想让调用线程阻塞,当然可以使用QueueUserWorkItem来调用此函数。


如果你正在使用.NET4并且不想使用Parallel.ForEach,那么新的Task功能也值得一看,因为它解决了直接使用线程池时遇到的许多问题。 - Will Dean
注意:不要假设Parallel.For,Parallel.ForEach和Parallel.ForAll不是并行执行的。MSDN 如果需要确保并行执行,则应谨慎使用。 - Ashitakalax

6

我曾在 .Net 3.5 中通过创建线程并将它们加载到队列中解决了这个问题。然后,我从队列中读取一个线程,启动它,并增加正在运行的线程计数。我一直这样做,直到达到上限。

每个线程完成时都会调用回调方法,该方法会减少运行计数并向队列读取器发出信号以启动更多线程。为了获得额外的控制,您可以使用字典来跟踪正在运行的线程,以 ManagedThreadId 为键,以便可以向线程发送停止早期或报告进度的信号。

示例控制台应用程序:

using System;
using System.Collections.Generic;
using System.Threading;

namespace ThreadTest
{
    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supervisor = new Supervisor();
            supervisor.LaunchThreads();
            Console.ReadLine();
            supervisor.KillActiveThreads();
            Console.ReadLine();
        }

        public delegate void WorkerCallbackDelegate(int threadIdArg);
        public static object locker = new object();

        class Supervisor
        {
            Queue<Thread> pendingThreads = new Queue<Thread>();
            Dictionary<int, Worker> activeWorkers = new Dictionary<int, Worker>();

            public void LaunchThreads()
            {
                for (int i = 0; i < 20; i++)
                {
                    Worker worker = new Worker();
                    worker.DoneCallBack = new WorkerCallbackDelegate(WorkerCallback);
                    Thread thread = new Thread(worker.DoWork);
                    thread.IsBackground = true;
                    thread.Start();
                    lock (locker)
                    {
                        activeWorkers.Add(thread.ManagedThreadId, worker);
                    }
                }
            }

            public void KillActiveThreads()
            {
                lock (locker)
                {
                    foreach (Worker worker in activeWorkers.Values)
                    {
                        worker.StopWork();
                    }
                }
            }

            public void WorkerCallback(int threadIdArg)
            {
                lock (locker)
                {
                    activeWorkers.Remove(threadIdArg);
                    if (activeWorkers.Count == 0)
                    {
                        Console.WriteLine("no more active threads");
                    }
                }
            }
        }

        class Worker
        {
            public WorkerCallbackDelegate DoneCallBack { get; set; }
            volatile bool quitEarly;

            public void DoWork()
            {
                quitEarly = false;
                Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString() + " started");
                DateTime startTime = DateTime.Now;
                while (!quitEarly && ((DateTime.Now - startTime).TotalSeconds < new Random().Next(1, 10)))
                {
                    Thread.Sleep(1000);
                }
                Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString() + " stopped");
                DoneCallBack(Thread.CurrentThread.ManagedThreadId);
            }

            public void StopWork()
            {
                quitEarly = true;
            }
        }
    }
}

1

处理这个问题的最佳方法是只创建maxDownloads数量的线程。将所有工作项放入队列中,让线程相互竞争以确定哪个线程处理每个工作项。

var queue = new ConcurrentQueue<downloadItem>(downloadList);
for (int i = 0; i < Math.Min(maxDownloads, queue.Count))
{
  var thread = new Thread(
    () =>
    {
      while (true)
      {
        downloadItem item = null;
        if (queue.TryDequeue(out item))
        {
          // Process the next work item.
          DownloadItem(item);
        }
        else
        {
          // No more work items are left.
          break;
        }
      }
    });
    thread.IsBackground = true;
    thread.Start();
}

你也可以使用信号量来限制处理工作项的线程数量。当实际线程数未知时,这尤其有用,例如在使用ThreadPool时。

var semaphore = new Semaphore(maxDownloads, maxDownloads);
for (int i = 0; i < downloadList.Count; i++)
{
  downloadItem item = downloadList[i];
  ThreadPool.QueueUserWorkItem(
    (state) =>
    {
      semaphore.WaitOne();
      try
      {
        DownloadItem(item);
      }
      finally
      {
        semaphore.Release();
      }
    });
}

我对这两种方法都不是特别喜欢。第一种方法的问题在于创建了非固定数量的线程。通常建议避免在for循环中创建线程,因为这往往不会很好地扩展。第二个问题是信号量将阻塞一些ThreadPool线程。这也不被建议,因为您实际上正在声明其中一个线程,然后什么也没有做。这可能会影响到其他无关任务的性能,这些任务恰好共享ThreadPool。我认为在这种情况下,这两个选项都可以,因为打造一个更可扩展的模式比它值得的工作要多。


0

我不明白你为什么要使用 ref 关键字。在 C# 中,对象默认是按引用传递的,在你原来的代码中,item 在传递给 DownloadItem 后没有被使用。因此,我建议你使用你尝试过的 ThreadPool 方法,但不要使用 ref 参数。

希望能对你有所帮助。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接