设置1
当我将其配置为每个块使用MaxDegreeOfParallelism = Environment.ProcessorCount
(在我的情况下为8
)时,我注意到它会在多个线程中填充缓冲区,并且直到跨所有线程接收到大约1700个元素后才开始处理第二个块。您可以在此处看到它的运行情况。
设置2
当我将MaxDegreeOfParallelism = 1
时,我注意到所有元素都在单个线程上接收,并且在接收到大约40个元素后,发送已经开始处理了。数据在此。
设置3
当我将MaxDegreeOfParallelism = 1
并在发送每个输入之前引入1000ms的延迟时,我注意到元素会在接收到它们后立即被发送,并且每个接收到的元素都会放在单独的线程上。数据在此。
到目前为止是设置。我的问题如下:
当我比较设置1和设置2时,我注意到在串行处理与并行处理相比(即使考虑到并行处理有8倍的线程),处理元素的速度要快得多。是什么导致了这种差异?
由于这将在ASP.NET环境中运行,我不希望产生不必要的线程,因为它们都来自单个线程池。正如设置3所示,即使只有少量数据,它仍然会分布在多个线程上。这也很令人惊讶,因为从设置1中我会认为数据按顺序在线程之间传递(请注意前50个元素都进入线程16)。我能否确保它仅在需要时创建新线程?
还有另一个概念称为
BufferBlock<T>
。如果TransformBlock<T>
已经排队输入,那么用BufferBlock
替换我的管道中的第一步(ReceiveElement
)会有什么实际区别?
class Program
{
static void Main(string[] args)
{
var dataflowProcessor = new DataflowProcessor<string>();
var amountOfTasks = 5;
var tasks = new Task[amountOfTasks];
for (var i = 0; i < amountOfTasks; i++)
{
tasks[i] = SpawnThread(dataflowProcessor, $"Task {i + 1}");
}
foreach (var task in tasks)
{
task.Start();
}
Task.WaitAll(tasks);
Console.WriteLine("Finished feeding threads"); // Needs to use async main
Console.Read();
}
private static Task SpawnThread(DataflowProcessor<string> dataflowProcessor, string taskName)
{
return new Task(async () =>
{
await FeedData(dataflowProcessor, taskName);
});
}
private static async Task FeedData(DataflowProcessor<string> dataflowProcessor, string threadName)
{
foreach (var i in Enumerable.Range(0, short.MaxValue))
{
await Task.Delay(1000); // Only used for the delayedSerialProcessing test
dataflowProcessor.Process($"Thread name: {threadName}\t Thread ID:{Thread.CurrentThread.ManagedThreadId}\t Value:{i}");
}
}
}
public class DataflowProcessor<T>
{
private static readonly ExecutionDataflowBlockOptions ExecutionOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
};
private static readonly TransformBlock<T, T> ReceiveElement = new TransformBlock<T, T>(element =>
{
Console.WriteLine($"Processing received element in thread {Thread.CurrentThread.ManagedThreadId}");
return element;
}, ExecutionOptions);
private static readonly ActionBlock<T> SendElement = new ActionBlock<T>(element =>
{
Console.WriteLine($"Processing sent element in thread {Thread.CurrentThread.ManagedThreadId}");
Console.WriteLine(element);
}, ExecutionOptions);
static DataflowProcessor()
{
ReceiveElement.LinkTo(SendElement);
ReceiveElement.Completion.ContinueWith(x =>
{
if (x.IsFaulted)
{
((IDataflowBlock) ReceiveElement).Fault(x.Exception);
}
else
{
ReceiveElement.Complete();
}
});
}
public void Process(T newElement)
{
ReceiveElement.Post(newElement);
}
}
MaxDegreeParallelism
),同时自动排队其他传入的数据。我是否理解错误? - Jeroen Vannevel