.NET自定义线程池和分离实例

12
什么是最受推荐的.NET自定义线程池,可以拥有单独的实例,即每个应用程序可使用多个线程池? 我需要一个无限队列大小(构建爬虫),并且需要为我正在爬取的每个站点并行运行一个单独的线程池。
编辑: 我需要尽可能快地从这些站点中获取信息,每个站点使用单独的线程池会使我能够控制任何给定时间内在每个站点上工作的线程数(不超过2-3)。
谢谢, Roey

1
为什么需要为每个站点运行单独的线程池? - AnthonyWJones
1
请花点时间观看Daniel Moth的视频:http://channel9.msdn.com/pdc2008/TL26/。您会发现无限队列并不是实现无限性能的道路。 - vgru
我同意安东尼的观点,我认为你不需要多个线程池。线程池的作用在于管理和平衡整个系统的工作负载。如果你有多个池,你最终只会过载你的机器。此外,不要忘记线程创建也会产生一定开销。这就是线程池的作用——它可以精心地管理线程数量,将工作平均分配给这些线程,使其运行在最佳状态下。 - Simon P Stevens
1
我需要为每个站点创建一个单独的线程池,以便在每个站点上不使用超过2-3个爬行线程。使用.NET线程池无法保证这一点(因为池中的所有线程可能同时处理来自同一站点的任务)。有什么其他方法可以实现这一点吗? - Roey
6个回答

8
我相信 Smart Thread Pool 可以做到这一点。它的ThreadPool类被实例化,因此您应该能够创建和管理所需的单独站点特定实例。

3

Ami Bar编写了一个出色的智能线程池,可以实例化。

在这里查看:这里


7
回答延迟了2分钟。AdamRalphs的回答可能还没有出现。 - Simon

1
使用BlockingCollection可以作为线程队列使用。以下是它的一个实现。 更新于2018-04-23:
public class WorkerPool<T> : IDisposable
{
    BlockingCollection<T> queue = new BlockingCollection<T>();
    List<Task> taskList;
    private CancellationTokenSource cancellationToken;
    int maxWorkers;
    private bool wasShutDown;

    int waitingUnits;

    public WorkerPool(CancellationTokenSource cancellationToken, int maxWorkers)
    {
        this.cancellationToken = cancellationToken;
        this.maxWorkers = maxWorkers;
        this.taskList = new List<Task>();
    }
    public void enqueue(T value)
    {
        queue.Add(value);
        waitingUnits++;
    }
    //call to signal that there are no more item
    public void CompleteAdding()
    {
        queue.CompleteAdding();          
    }

    //create workers and put then running
    public void startWorkers(Action<T> worker)
    {
        for (int i = 0; i < maxWorkers; i++)
        {
            taskList.Add(new Task(() =>
            {
                string myname = "worker " + Guid.NewGuid().ToString();

                try
                {
                    while (!cancellationToken.IsCancellationRequested)
                    {                     
                        var value = queue.Take();
                        waitingUnits--;
                        worker(value);
                    }
                }
                catch (Exception ex) when (ex is InvalidOperationException)  //throw when collection is closed with  CompleteAdding method. No pretty way to do this.
                {
                    //do nothing
                }
            }));
        }

        foreach (var task in taskList)
        {
            task.Start();
        }
    }

    //wait for all workers to be finish their jobs
    public void await()
    {
        while (waitingUnits >0 || !queue.IsAddingCompleted)
            Thread.Sleep(100);

        shutdown();
    }

    private void shutdown()
    {
        wasShutDown = true;
        Task.WaitAll(taskList.ToArray());            
    }

    //case something bad happen dismiss all pending work
    public void Dispose()
    {
        if (!wasShutDown)
        {
            queue.CompleteAdding();
            shutdown();
        }
    }
}

然后像这样使用:
WorkerPool<int> workerPool = new WorkerPool<int>(new CancellationTokenSource(), 5);

workerPool.startWorkers(value =>
{
    log.Debug(value);
});
//enqueue all the work
for (int i = 0; i < 100; i++)
{
    workerPool.enqueue(i);
}
//Signal no more work
workerPool.CompleteAdding();

//wait all pending work to finish
workerPool.await();

您可以创建新的WorkPool对象,随意添加投票,数量不限。


1

2
这用于并行化到多个 CPU 核心,然而它并不能解决我的问题,因为各个 TaskManager 无法告诉我它们何时完成所有任务的运行。 - Roey
已经过了很长时间,所以链接已经失效(第一种情况),或者现在指向一个通用页面(第二种情况)。TPL文档位于https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/task-parallel-library-tpl。实际上,它似乎在幕后使用`System.Threading.ThreadPool`,这意味着它只为每个应用程序提供_一个_池。 - mklement0

0

这里有一个免费的NuGet库:CodeFluentRuntimeClient,其中包含一个CustomThreadPool类,您可以重复使用它。它非常灵活,您可以更改池线程的优先级、数量、COM公寓状态、名称(用于调试)以及文化等。


0
另一种方法是使用Dataflow Pipeline。我之所以后来添加了这些答案,是因为我发现对于这种问题,Dataflows是一种更好的方法,即存在多个线程池的问题。它们提供了一种更灵活和结构化的方法,并且可以轻松地进行垂直扩展。
您可以将代码分成一个或多个块,用Dataflows链接它们,然后让Dataflow引擎根据CPU和内存可用性分配线程。
我建议将其分成3个块,一个用于准备查询网站页面,一个用于访问网站页面,最后一个用于分析数据。 这样,慢速块(获取)可能会分配更多的线程来补偿。
以下是Dataflow设置的外观:
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

prepareBlock.LinkTo(get, linkOptions);
getBlock.LinkTo(analiseBlock, linkOptions);

数据将从prepareBlock流向getBlock,然后流向analiseBlock。 块之间的接口可以是任何类,只需相同即可。在Dataflow Pipeline上查看完整示例。

使用Dataflow将类似于以下内容:

 while ...{
    ...
    prepareBlock.Post(...); //to send data to the pipeline
 }
 prepareBlock.Complete(); //when done
 analiseBlock.Completion.Wait(cancellationTokenSource.Token); //to wait for all queues to empty or cancel 

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接