C#多线程与插槽

7

我有这个函数,用于检查代理服务器,目前它仅检查一定数量的线程并等待所有线程完成后再开始下一组。是否可能在最大允许数的情况下,一旦一个线程完成就立即启动新的线程?

for (int i = 0; i < listProxies.Count(); i+=nThreadsNum)
{                              
    for (nCurrentThread = 0; nCurrentThread < nThreadsNum; nCurrentThread++)
    {
        if (nCurrentThread < nThreadsNum)
        {
           string strProxyIP = listProxies[i + nCurrentThread].sIPAddress;
           int nPort = listProxies[i + nCurrentThread].nPort;
                    tasks.Add(Task.Factory.StartNew<ProxyAddress>(() => CheckProxyServer(strProxyIP, nPort, nCurrentThread)));
        }
     }                

     Task.WaitAll(tasks.ToArray());

     foreach (var tsk in tasks)
     {
        ProxyAddress result = tsk.Result;
        UpdateProxyDBRecord(result.sIPAddress, result.bOnlineStatus);
     }

     tasks.Clear();                
}

你是在寻找这个吗?链接 - EJoshuaS - Stand with Ukraine
也许您正在寻找 ThreadPool:https://learn.microsoft.com/en-us/dotnet/api/system.threading.threadpool?view=netframework-4.8 或 Parallel.ForEach:https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel.foreach?view=netframework-4.8 - Robert McKee
@EJoshuaS 不是很需要,因为继续任务/链会使用其父级的结果,我不需要将结果从一个线程传递到另一个线程。 - Mario
1
你没有正确使用任务。"检查代理"是I/O绑定的。很有可能你不需要超过1个线程来驱动它们并发执行。你不需要多线程或接受的答案使用任务并行库(这是用于CPU绑定操作的)。不要使用Task.Result,阅读异步最佳实践,并选择BionicCode的答案中的其中一种更优选项。 - Saeb Amini
4个回答

5
这似乎简单多了:
int numberProcessed = 0;
Parallel.ForEach(listProxies,
  new ParallelOptions { MaxDegreeOfParallelism = nThreadsNum },
  (p)=> {
    var result = CheckProxyServer(p.sIPAddress, s.nPort, Thread.CurrentThread.ManagedThreadId);
    UpdateProxyDBRecord(result.sIPAddress, result.bOnlineStatus);
    Interlocked.Increment(numberProcessed);
});

使用插槽:

var obj = new Object();
var slots = new List<int>();
Parallel.ForEach(listProxies,
  new ParallelOptions { MaxDegreeOfParallelism = nThreadsNum },
  (p)=> {
    int threadId = Thread.CurrentThread.ManagedThreadId;
    int slot = slots.IndexOf(threadId);
    if (slot == -1)
    {
      lock(obj)
      {
        slots.Add(threadId);
      }
      slot = slots.IndexOf(threadId);
    }
    var result = CheckProxyServer(p.sIPAddress, s.nPort, slot);
    UpdateProxyDBRecord(result.sIPAddress, result.bOnlineStatus);
});

我在那里采用了一些捷径来保证线程安全。你不需要进行常规的检查-锁定-检查操作,因为永远不会有两个线程尝试将相同的线程ID添加到列表中,所以第二次检查总是失败而且不需要。其次,出于同样的原因,我认为你不需要在外部IndexOf周围加锁。这使得它成为一个非常高效的并发例程,无论可枚举物品的数量如何,都很少使用锁(它应该只锁定nThreadsNum次)。


评论不是用于进行长时间讨论的;本次对话已经移动到聊天室 - Samuel Liew
你可以使用以下代码简化: int slot = (int)Task.CurrentId; slot 的值将在 1 到 nThreadsNum 之间。 - jcmeyrignac

3
另一个解决方案是使用SemaphoreSlim或使用BlockinCollection<T>的生产者-消费者模式。这两种解决方案都支持取消操作。
private async Task CheckProxyServerAsync(IEnumerable<object> proxies)
{
  var tasks = new List<Task>();
  int currentThreadNumber = 0;
  int maxNumberOfThreads = 8;

  using (semaphore = new SemaphoreSlim(maxNumberOfThreads, maxNumberOfThreads))
  {
    foreach (var proxy in proxies)
    {
      // Asynchronously wait until thread is available if thread limit reached
      await semaphore.WaitAsync();

      string proxyIP = proxy.IPAddress;
      int port = proxy.Port;
      tasks.Add(Task.Run(() => CheckProxyServer(proxyIP, port, Interlocked.Increment(ref currentThreadNumber)))
        .ContinueWith(
          (task) =>
          {
            ProxyAddress result = task.Result;

            // Method call must be thread-safe!
            UpdateProxyDbRecord(result.IPAddress, result.OnlineStatus);

            Interlocked.Decrement(ref currentThreadNumber);

            // Allow to start next thread if thread limit was reached
            semaphore.Release();
          },
          TaskContinuationOptions.OnlyOnRanToCompletion));
    }

    // Asynchronously wait until all tasks are completed
    // to prevent premature disposal of semaphore
    await Task.WhenAll(tasks);
  }
}

生产者-消费者模式
// Uses a fixed number of same threads
private async Task CheckProxyServerAsync(IEnumerable<ProxyInfo> proxies)
{
  var pipe = new BlockingCollection<ProxyInfo>();
  int maxNumberOfThreads = 8;
  var tasks = new List<Task>();

  // Create all threads (count == maxNumberOfThreads)
  for (int currentThreadNumber = 0; currentThreadNumber < maxNumberOfThreads; currentThreadNumber++)
  {
    tasks.Add(
      Task.Run(() => ConsumeProxyInfo(pipe, currentThreadNumber)));
  }

  proxies.ToList().ForEach(pipe.Add);
  pipe.CompleteAdding();

  await Task.WhenAll(tasks);
}

private void ConsumeProxyInfo(BlockingCollection<ProxyInfo> proxiesPipe, int currentThreadNumber)
{
  while (!proxiesPipe.IsCompleted)
  {
    if (proxiesPipe.TryTake(out ProxyInfo proxy))
    {
      int port = proxy.Port;
      string proxyIP = proxy.IPAddress;
      ProxyAddress result = CheckProxyServer(proxyIP, port, currentThreadNumber); 

      // Method call must be thread-safe!
      UpdateProxyDbRecord(result.IPAddress, result.OnlineStatus);
    }
  }
}

这是一个不错的解决方案,一旦你修复了nCurrentThread,就可以很好地使用SemaphoreSlim。 - Robert McKee
@RobertMcKee,需要修复什么问题,您能告诉我吗?我认为nCurrentThread只是线程计数器,对吧?我的意思是总线程数还是相对于可用线程(它是否也需要递减)? - BionicCode
他将其用作线程索引,因此它应该从0到最大线程数范围内变化,这样每个当前运行的线程在该范围内都会有一个独特的值。 - Robert McKee
@RobertMcKee 感谢您的提示。我已经调整了它。 - BionicCode

1
如果我正确理解了您的问题,那么使用await Task.WhenAny实际上非常简单。基本上,您需要保留所有正在运行的任务的集合。一旦达到一定数量的正在运行的任务,您就等待一个或多个任务完成,然后从集合中删除已完成的任务并继续添加更多任务。
以下是我的示例:
        var tasks = new List<Task>();

        for (int i = 0; i < 20; i++)
        {
            // I want my list of tasks to contain at most 5 tasks at once
            if (tasks.Count == 5)
            {
                // Wait for at least one of the tasks to complete
                await Task.WhenAny(tasks.ToArray());

                // Remove all of the completed tasks from the list
                tasks = tasks.Where(t => !t.IsCompleted).ToList();
            }

            // Add some task to the list
            tasks.Add(Task.Factory.StartNew(async delegate ()
                {
                    await Task.Delay(1000);
                }));
        }

@MarioM 在你从列表中移除任务之前,你可以对已完成的任务进行LINQ查询并获取它们的结果。 - EJoshuaS - Stand with Ukraine
是否可以有一个插槽ID?这样每个线程都知道它正在运行的线程编号吗?这样我就可以在网格中为每个线程显示进度状态。 - Mario
@MarioM 你能在列表中使用索引吗? - EJoshuaS - Stand with Ukraine
不行,因为索引将达到5000,而我只有32个线程。 - Mario
同时我无法使用异步代理将参数添加到函数中。 tasks.Add(Task.Factory.StartNew<ProxyAddress>(async delegate () => CheckProxyServer(strProxyIP, nPort, nCurrentThread))); - Mario

1
我建议您略微改变方法。不要启动和停止线程,而是将代理服务器数据放入并发队列中,每个代理服务器一个项目。然后创建一定数量的线程(或异步任务)来处理队列。这样更有可能提供平稳的性能(您不会一遍又一遍地启动和停止线程,这会产生开销),而且在我看来编码起来更容易。
一个简单的例子:
class ProxyChecker
{
    private ConcurrentQueue<ProxyInfo> _masterQueue = new ConcurrentQueue<ProxyInfo>();

    public ProxyChecker(IEnumerable<ProxyInfo> listProxies)
    {
        foreach (var proxy in listProxies)
        {
            _masterQueue.Enqueue(proxy);
        }
    }

    public async Task RunChecks(int maximumConcurrency)
    {
        var count = Math.Max(maximumConcurrency, _masterQueue.Count);
        var tasks = Enumerable.Range(0, count).Select( i => WorkerTask() ).ToList();
        await Task.WhenAll(tasks);
    }

    private async Task WorkerTask()
    {
        ProxyInfo proxyInfo;
        while ( _masterList.TryDequeue(out proxyInfo))
        {
            DoTheTest(proxyInfo.IP, proxyInfo.Port)
        }
    }
} 

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接