多线程单生产者多消费者实现

3
我想实现单生产者和多消费者模式的多文件下载。
我的现有情况是: - 代码在循环中查找要下载的新链接 - 当找到新链接时,它调用下载函数 - 下载函数接受源文件路径和目标文件路径,并下载该文件。
我想做的是: - 同时下载X个文件(我不知道文件的总数) - 随时都应该能够同时下载X个文件-只要X个文件中的任何一个下载完成,调用函数就应该能够立即添加新的下载,然后立即开始下载。
具体实现:
  • 所以我有一个生产者函数,它不断向队列中添加新的下载(最多同时进行X个下载)
  • 多个X线程消耗下载并单独开始下载。一旦下载完成,生产者应该能够添加新的下载,这将产生新的线程。
请提供一个示例。
4个回答

5
对于这个P/C问题,你只需要使用BlockingCollection<T>即可。
//shared and thread-safe
static BlockingCollection<string> queue = new BlockingCollection<string>(100);

// Producer
queue.Add(fileName);  // will block when full

// Consumer
if (queue.TryTake(out fileName, timeOut))  // waits when empty
  ...

您需要使用超时和CancellationTokens进行微调。


当您出队一个项目并执行一些处理后,如何将结果返回给调用进程? - gdp
“调用进程”究竟是什么? - H H
我正在考虑实现一个队列来限制Web请求。所以我有一个方法,比如说DownloadUser(),从Web服务中获取请求并将其添加到队列中进行限流。那么我该如何将结果返回给DownloadUser()方法呢? - gdp
问题说明了“单生产者多消费者”的情况。而你的例子是一对一的。当消费者调用TryTake方法时,该项将从线程安全集合中移除。其他消费者没有机会获取到该项。 - Aaron Hudon
@ajhuddy - 其他消费者想要其他物品。这个答案适用于多个消费者。你可能将P/C与总线混淆了。 - H H
显示剩余2条评论

2

ReaderWriterLockSlim 类旨在实现此目的。

此外,查阅这个关于线程的精彩网站:

http://www.albahari.com/threading/part4.aspx#_Reader_Writer_Locks

示例代码源自上述网站。

class SlimDemo
{
  static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim();
  static List<int> _items = new List<int>();
  static Random _rand = new Random();

  static void Main()
  {
    new Thread (Read).Start();
    new Thread (Read).Start();
    new Thread (Read).Start();

    new Thread (Write).Start ("A");
    new Thread (Write).Start ("B");
  }

  static void Read()
  {
    while (true)
    {
      _rw.EnterReadLock();
      foreach (int i in _items) Thread.Sleep (10);
      _rw.ExitReadLock();
    }
  }

  static void Write (object threadID)
  {
    while (true)
    {
      int newNumber = GetRandNum (100);
      _rw.EnterWriteLock();
      _items.Add (newNumber);
      _rw.ExitWriteLock();
      Console.WriteLine ("Thread " + threadID + " added " + newNumber);
      Thread.Sleep (100);
    }
  }

  static int GetRandNum (int max) { lock (_rand) return _rand.Next(max); }
}

生产者 = 写入器,消费者 = 读取器。在线程命名法中,读取器/写入器是用于读取或写入共享数据(例如下载队列中的_items)的常见名称。 - Jakub Konecki
1
如何在Read()方法中处理完_items列表中的线程安全项后删除它们? - Odrai

1

在老板和工作人员之间的通信中使用并发集合。
如果您关心顺序,请使用ConcurrentQueue;否则可以使用ConcurrentBag。
老板向ConcurrentQueue添加(Add方法),而工作人员从队列中取出(Take方法)。如果需要代码,请告诉我。


0
我建议您研究一下任务并行库。它可以很干净地封装方法调用,并为您管理多个线程。

你确定任务并行库解决了访问共享数据的问题吗?请看这里http://msdn.microsoft.com/en-us/library/dd997392.aspx,阅读“避免写入共享内存位置”一节。 - Jakub Konecki

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接