如何确保数据流块仅在需要时创建线程?

21
我使用TPL Dataflow API编写了一个小型管道,它从多个线程接收数据并对其进行处理。

设置1

当我将其配置为每个块使用MaxDegreeOfParallelism = Environment.ProcessorCount(在我的情况下为8)时,我注意到它会在多个线程中填充缓冲区,并且直到跨所有线程接收到大约1700个元素后才开始处理第二个块。您可以在此处看到它的运行情况。

设置2

当我将MaxDegreeOfParallelism = 1时,我注意到所有元素都在单个线程上接收,并且在接收到大约40个元素后,发送已经开始处理了。数据在此

设置3

当我将MaxDegreeOfParallelism = 1并在发送每个输入之前引入1000ms的延迟时,我注意到元素会在接收到它们后立即被发送,并且每个接收到的元素都会放在单独的线程上。数据在此
到目前为止是设置。我的问题如下:
  1. 当我比较设置1和设置2时,我注意到在串行处理与并行处理相比(即使考虑到并行处理有8倍的线程),处理元素的速度要快得多。是什么导致了这种差异?

  2. 由于这将在ASP.NET环境中运行,我不希望产生不必要的线程,因为它们都来自单个线程池。正如设置3所示,即使只有少量数据,它仍然会分布在多个线程上。这也很令人惊讶,因为从设置1中我会认为数据按顺序在线程之间传递(请注意前50个元素都进入线程16)。我能否确保它仅在需要时创建新线程?

  3. 还有另一个概念称为BufferBlock<T>。如果TransformBlock<T>已经排队输入,那么用BufferBlock替换我的管道中的第一步(ReceiveElement)会有什么实际区别?


class Program
{
    static void Main(string[] args)
    {
        var dataflowProcessor = new DataflowProcessor<string>();
        var amountOfTasks = 5;
        var tasks = new Task[amountOfTasks];

        for (var i = 0; i < amountOfTasks; i++)
        {
            tasks[i] = SpawnThread(dataflowProcessor, $"Task {i + 1}");
        }

        foreach (var task in tasks)
        {
            task.Start();
        }

        Task.WaitAll(tasks);
        Console.WriteLine("Finished feeding threads"); // Needs to use async main
        Console.Read();
    }

    private static Task SpawnThread(DataflowProcessor<string> dataflowProcessor, string taskName)
    {
        return new Task(async () =>
        {
            await FeedData(dataflowProcessor, taskName);
        });
    }

    private static async Task FeedData(DataflowProcessor<string> dataflowProcessor, string threadName)
    {
        foreach (var i in Enumerable.Range(0, short.MaxValue))
        {
            await Task.Delay(1000); // Only used for the delayedSerialProcessing test
            dataflowProcessor.Process($"Thread name: {threadName}\t Thread ID:{Thread.CurrentThread.ManagedThreadId}\t Value:{i}");
        }
    }
}


public class DataflowProcessor<T>
{
    private static readonly ExecutionDataflowBlockOptions ExecutionOptions = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = Environment.ProcessorCount
    };

    private static readonly TransformBlock<T, T> ReceiveElement = new TransformBlock<T, T>(element =>
    {
        Console.WriteLine($"Processing received element in thread {Thread.CurrentThread.ManagedThreadId}");
        return element;
    }, ExecutionOptions);

    private static readonly ActionBlock<T> SendElement = new ActionBlock<T>(element =>
    {
        Console.WriteLine($"Processing sent element in thread {Thread.CurrentThread.ManagedThreadId}");
        Console.WriteLine(element);
    }, ExecutionOptions);

    static DataflowProcessor()
    {
        ReceiveElement.LinkTo(SendElement);

        ReceiveElement.Completion.ContinueWith(x =>
        {
            if (x.IsFaulted)
            {
                ((IDataflowBlock) ReceiveElement).Fault(x.Exception);
            }
            else
            {
                ReceiveElement.Complete();
            }
        });
    }


    public void Process(T newElement)
    {      
        ReceiveElement.Post(newElement);
    }
}

5
TPL Dataflow 不了解线程,它使用任务(tasks)来实现。线程池(ThreadPool)与任务相匹配。您可以通过设置自己的“TaskScheduler”来更改默认设置。 - i3arnon
@i3arnon 我写这篇文章的真正原因是将请求线程上完成的工作转移到其他线程,以便释放请求线程并使其继续执行。也许我的方法不正确? - Jeroen Vannevel
@JeroenVannevel 如l3arnon所提到的,TPL Dataflow使用任务,因此它确实释放了发布线程。您的代码过于复杂。向开始块发送简单的Post并等待最后一个块的Completion属性应该就足够了。 - Panagiotis Kanavos
@PanagiotisKanavos 我不想创建大量的任务,所有这些任务都从IIS线程池请求线程。如果我理解正确,通过数据流传递它,我可以限制创建的任务(或线程?)的数量(MaxDegreeParallelism),同时自动排队其他传入的数据。我是否理解错误? - Jeroen Vannevel
可能有几种方式。首先,任务是抽象概念,而不是线程本身。TaskScheduler是将任务分配给线程池中已创建的线程的工具。无论如何,默认情况下每个块都使用自己的任务,最大DOP为1。您必须干预才能将DOP更改为更大的数字。即使MaxDOP=10也并不意味着您会获得10个线程。如果处理足够快,则可能仅使用1个或5个或其他需要处理输入缓冲区的数量。 - Panagiotis Kanavos
显示剩余4条评论
1个回答

12
在将解决方案部署到ASP.NET环境之前,我建议您更改架构:IIS可以在处理请求后为自己的使用挂起ASP.NET中的线程,因此您的任务可能未完成。更好的方法是创建一个单独的Windows服务守护程序,处理您的数据流。
现在回到TPL Dataflow。
我喜欢TPL Dataflow库,但它的文档真的很混乱。 我找到的唯一有用的文件是Introduction to TPL Dataflow
其中有一些线索可能会有所帮助,特别是关于配置设置的线索(如果需要,建议您使用自己的TaskScheduler实现和自己的TheadPool实现以及MaxMessagesPerTask选项进行调查)。
内置的数据流块是可配置的,开发人员可以通过DataflowBlockOptions类及其派生类型(ExecutionDataflowBlockOptionsGroupingDataflowBlockOptions)公开地控制块在何处以及如何执行工作。以下是一些关键的旋钮,这些旋钮都可以在构造时提供给块的实例中进行调整。
  • TaskScheduler 定制化,如 @i3arnon 所提到的:

    默认情况下,数据流块将工作安排给 TaskScheduler.Default,该调度程序针对 .NET ThreadPool 的内部工作。

  • MaxDegreeOfParallelism

    它默认为 1,意味着一次只能在块中发生一件事。如果设置为大于 1 的值,则该数量的消息可以由块并发处理。如果设置为 DataflowBlockOptions.Unbounded (-1),则任何数量的消息都可以被并发处理,最大值由数据流块所针对的底层调度程序自动管理。请注意,MaxDegreeOfParallelism 是一个最大值,而不是一个要求。

  • MaxMessagesPerTask

    TPL Dataflow 专注于效率和控制。在必须在两者之间进行权衡的情况下,系统努力提供质量默认值,同时还允许开发人员根据特定情况自定义行为。其中一个例子是性能和公平之间的权衡。默认情况下,数据流块试图最小化处理所有数据所需的任务对象的数量。这提供了非常有效的执行;只要块有可用于处理的数据,该块的任务将继续处理可用的数据,仅在没有更多数据可用时退役(直到再次有数据可用,此时将启动更多任务)。然而,这可能会导致公平性问题。如果系统当前正在处理来自给定块集的数据,然后数据到达其他块,则后者将需要等待第一个块完成处理才能开始,或者冒着过度订阅系统的风险。这可能是特定情况下的正确行为,也可能不是。为了解决这个问题,存在 MaxMessagesPerTask 选项。 它默认为 DataflowBlockOptions.Unbounded (-1),意味着没有最大值。但是,如果设置为正数,则该数字将表示给定块可以使用单个任务处理的最大消息数量。一旦达到该限制,块必须退出任务并替换为副本以继续处理。这些副本与调度程序安排的所有其他任务一样公平对待,允许块之间实现适度的公平。在极端情况下,如果将 MaxMessagesPerTask 设置为 1,则每条消息将使用一个任务,以实现终极公平,可能会牺牲比其他情况下更多的任务。

  • MaxNumberOfGroups

    分组块能够跟踪它们生成的组数,并在生成了该数量的组后自动完成(拒绝进一步提供的消息)。默认情况下,组数为 DataflowBlockOptions.Unbounded (-1),但可以明确设置为大于 1 的值。

  • CancellationToken

    在数据流块的生命周期中监视此令牌。如果在块完成之前收到取消请求,则块将尽可能礼貌地和快速地停止操作。

  • 贪婪

    默认情况下,目标块是贪婪的,希望所有提供给它们的数据。

  • BoundedCapacity


3
问题并不在于文档,因为Dataflow非常简单。问题实际上在于接受它确实如此简单,不需要任何特殊技巧或设置。Stephen Cleary撰写了一系列入门级博客文章。 - Panagiotis Kanavos
MSN还有一个完整的Dataflow部分,提供了如何指南和演示。 - Panagiotis Kanavos
5
不同意:指南并没有详细讲解块的设置,但我链接的文档有。Stephen Cleary写了一篇很好的入门文章,而不是OP需要的超基础定制。 - VMAtm
@PanagiotisKanavos 更多关于问题的内容:我认为 OP 试图在 ASP.NET 中使用 TPL 是最重要的事情,这可能存在错误,因为线程可以通过 IIS 被挂起。 - VMAtm

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接