我希望限制我在所有Dataflow块中提交给数据库服务器的查询总数为30。 在以下场景中,每个块的30个并发任务限制是针对每个块的,因此在执行期间始终会达到60个并发任务。 显然,我可以将我的并行性限制为每个块的15个,以实现系统范围内的总计30个,但这并不是最优解。
如何使其工作? 我是否使用SemaphoreSlim等来限制(和阻止)我的awaits,还是有更好的内在Dataflow方法?
如何使其工作? 我是否使用SemaphoreSlim等来限制(和阻止)我的awaits,还是有更好的内在Dataflow方法?
public class TPLTest
{
private long AsyncCount = 0;
private long MaxAsyncCount = 0;
private long TaskId = 0;
private object MetricsLock = new object();
public async Task Start()
{
ExecutionDataflowBlockOptions execOption
= new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 30 };
DataflowLinkOptions linkOption = new DataflowLinkOptions()
{ PropagateCompletion = true };
var doFirstIOWorkAsync = new TransformBlock<Data, Data>(
async data => await DoIOBoundWorkAsync(data), execOption);
var doCPUWork = new TransformBlock<Data, Data>(
data => DoCPUBoundWork(data));
var doSecondIOWorkAsync = new TransformBlock<Data, Data>(
async data => await DoIOBoundWorkAsync(data), execOption);
var doProcess = new TransformBlock<Data, string>(
i => $"Task finished, ID = : {i.TaskId}");
var doPrint = new ActionBlock<string>(
s => Debug.WriteLine(s));
doFirstIOWorkAsync.LinkTo(doCPUWork, linkOption);
doCPUWork.LinkTo(doSecondIOWorkAsync, linkOption);
doSecondIOWorkAsync.LinkTo(doProcess, linkOption);
doProcess.LinkTo(doPrint, linkOption);
int taskCount = 150;
for (int i = 0; i < taskCount; i++)
{
await doFirstIOWorkAsync.SendAsync(new Data() { Delay = 2500 });
}
doFirstIOWorkAsync.Complete();
await doPrint.Completion;
Debug.WriteLine("Max concurrent tasks: " + MaxAsyncCount.ToString());
}
private async Task<Data> DoIOBoundWorkAsync(Data data)
{
lock(MetricsLock)
{
AsyncCount++;
if (AsyncCount > MaxAsyncCount)
MaxAsyncCount = AsyncCount;
}
if (data.TaskId <= 0)
data.TaskId = Interlocked.Increment(ref TaskId);
await Task.Delay(data.Delay);
lock (MetricsLock)
AsyncCount--;
return data;
}
private Data DoCPUBoundWork(Data data)
{
data.Step = 1;
return data;
}
}
数据类:
public class Data
{
public int Delay { get; set; }
public long TaskId { get; set; }
public int Step { get; set; }
}
起点:
TPLTest tpl = new TPLTest();
await tpl.Start();
SemaphoreSlim
来限制对数据库的访问。这样,您将拥有一个单一的控制点来控制如何访问数据库,而不是依赖于DataFlow中的细节。 - Nick