如何在多个TPL Dataflow块之间跨度调整MaxDegreeOfParallelism?

4
我希望限制我在所有Dataflow块中提交给数据库服务器的查询总数为30。 在以下场景中,每个块的30个并发任务限制是针对每个块的,因此在执行期间始终会达到60个并发任务。 显然,我可以将我的并行性限制为每个块的15个,以实现系统范围内的总计30个,但这并不是最优解。
如何使其工作? 我是否使用SemaphoreSlim等来限制(和阻止)我的awaits,还是有更好的内在Dataflow方法?
public class TPLTest
{
    private long AsyncCount = 0;
    private long MaxAsyncCount = 0;
    private long TaskId = 0;
    private object MetricsLock = new object();

    public async Task Start()
    {
        ExecutionDataflowBlockOptions execOption
            = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 30 };
        DataflowLinkOptions linkOption = new DataflowLinkOptions()
            { PropagateCompletion = true };

        var doFirstIOWorkAsync = new TransformBlock<Data, Data>(
            async data => await DoIOBoundWorkAsync(data), execOption);
        var doCPUWork = new TransformBlock<Data, Data>(
            data => DoCPUBoundWork(data));
        var doSecondIOWorkAsync = new TransformBlock<Data, Data>(
            async data => await DoIOBoundWorkAsync(data), execOption);
        var doProcess = new TransformBlock<Data, string>(
            i => $"Task finished, ID = : {i.TaskId}");
        var doPrint = new ActionBlock<string>(
            s => Debug.WriteLine(s));

        doFirstIOWorkAsync.LinkTo(doCPUWork, linkOption);
        doCPUWork.LinkTo(doSecondIOWorkAsync, linkOption);
        doSecondIOWorkAsync.LinkTo(doProcess, linkOption);
        doProcess.LinkTo(doPrint, linkOption);

        int taskCount = 150;
        for (int i = 0; i < taskCount; i++)
        {
            await doFirstIOWorkAsync.SendAsync(new Data() { Delay = 2500 });
        }
        doFirstIOWorkAsync.Complete();

        await doPrint.Completion;
        Debug.WriteLine("Max concurrent tasks: " + MaxAsyncCount.ToString());
    }

    private async Task<Data> DoIOBoundWorkAsync(Data data)
    {
        lock(MetricsLock)
        {
            AsyncCount++;
            if (AsyncCount > MaxAsyncCount)
                MaxAsyncCount = AsyncCount;
        }

        if (data.TaskId <= 0)
            data.TaskId = Interlocked.Increment(ref TaskId);

        await Task.Delay(data.Delay);

        lock (MetricsLock)
            AsyncCount--;

        return data;
    }

    private Data DoCPUBoundWork(Data data)
    {
        data.Step = 1;
        return data;
    }
}

数据类:

public class Data
{
    public int Delay { get; set; }
    public long TaskId { get; set; }
    public int Step { get; set; }
}

起点:

TPLTest tpl = new TPLTest();
await tpl.Start();

最好使用信号量,因为它允许您仅在执行每个查询时锁定,并且不会影响该块中涉及的任何其他工作。创建一个抽象层,用于运行查询(或更改现有抽象层,如果您已经有一个),以便在查询时抽象层获取信号量,这样您就不必每次手动执行它。 - Servy
我同意@Servy的观点。我建议使用SemaphoreSlim来限制对数据库的访问。这样,您将拥有一个单一的控制点来控制如何访问数据库,而不是依赖于DataFlow中的细节。 - Nick
如果任何块执行查找操作,最好预加载和缓存查找值,就像SSIS一样。如果块执行多个活动,请将它们分开以允许每个块一次只执行一个作业,而不会阻塞太长时间。 - Panagiotis Kanavos
@PanagiotisKanavos 我的数据库服务器有40个内核,使用具有许多SSD磁轮的SAN。选择30个并发查询的限制是因为我的测试表明这是最多的数量,超过这个数量会对其他用户造成不利影响。我提供了我的过程的简化示例。我有8个I/O块,它们按顺序链接,并且每个块之间都有CPU绑定处理。初始数据已预加载,包含10,000多条记录,但随着数据通过每个链接的I/O块传播,会生成约2500万个下游记录。 - NPCampbell
@NPCampbell,你现在还没有解释你的工作是什么。在ETL场景中,“CPU bound”和“IO bound”并不能说明问题 - ETL数据流中的所有操作都是IO bound,大多数操作都是按顺序链接的,很少有操作会因为通常情况下数据库可以更好地处理数据而变成CPU bound。将数据存储在数据库中,并使用查询来生成组合可能比将数据移动到SSIS或其他数据流要快得多。 - Panagiotis Kanavos
显示剩余8条评论
3个回答

3

为什么不将所有内容编组到具有实际限制的操作块中?

var count = 0;
var ab1 = new TransformBlock<int, string>(l => $"1:{l}");
var ab2 = new TransformBlock<int, string>(l => $"2:{l}");
var doPrint = new ActionBlock<string>(
    async s =>
    {
        var c = Interlocked.Increment(ref count);
        Console.WriteLine($"{c}:{s}");
        await Task.Delay(5);
        Interlocked.Decrement(ref count);
    },
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 15 });

ab1.LinkTo(doPrint);
ab2.LinkTo(doPrint);

for (var i = 100; i > 0; i--)
{
    if (i % 3 == 0) await ab1.SendAsync(i);
    if (i % 5 == 0) await ab2.SendAsync(i);
}

ab1.Complete();
ab2.Complete();

await ab1.Completion;
await ab2.Completion;

1
谁给这个点踩了,为什么?这实际上是处理ETL的正确方式。即使使用信号量,30个被阻止的连接也比1个尽可能快地传输数据更糟糕。 - Panagiotis Kanavos
我提供了一个不太好的例子。我的输入/输出块以前一个输入/输出块的输出为输入,经过一些微小的处理后进行操作。这些块实际上并不相同。有些执行查找操作,有些检索数据集,还有一些执行存储过程。我能够使用一个返回 DataSet 的通用块来实现这种方法,但不能用于所有我的输入/输出块类型。 - NPCampbell
2
@NPCampbell,那不应该是问题。doPrint块甚至可以什么都不做,除了限制同时调用的数量。这也是一个薄弱的示例,展示了解决方案的路径。 - Paulo Morgado
@PauloMorgado 我对你的doPrint块注释很感兴趣。我有大约10,000个元素通过我的8个I/O块的管道。我的理解是,虚拟的doPrint块可以限制通过管道该点的元素数量,但它不会限制上游或下游的数据库活动。我的理解正确吗? - NPCampbell
它会限制下游的所有内容,因为只有那么多操作会被执行。我所说的“愚蠢”并不是指“什么都不做”,而是指“不做聪明的事情”。 - Paulo Morgado

0
这是我最终采用的解决方案(除非我能想出如何使用单个通用的DataFlow块来编排每种类型的数据库访问):
我在类级别上定义了一个SemaphoreSlim:
private SemaphoreSlim ThrottleDatabaseQuerySemaphore = new SemaphoreSlim(30, 30);

我修改了I/O类以调用一个节流类:

    private async Task<Data> DoIOBoundWorkAsync(Data data)
    {
        if (data.TaskId <= 0)
            data.TaskId = Interlocked.Increment(ref TaskId);

        Task t = Task.Delay(data.Delay); ;
        await ThrottleDatabaseQueryAsync(t);

        return data;
    }

限流类:(我还有一个通用版本的限流例程,因为我无法想出一种例程来处理Task和Task<TResult>)

    private async Task ThrottleDatabaseQueryAsync(Task task)
    {
        await ThrottleDatabaseQuerySemaphore.WaitAsync();
        try
        {
            lock (MetricsLock)
            {
                AsyncCount++;
                if (AsyncCount > MaxAsyncCount)
                    MaxAsyncCount = AsyncCount;
            }

            await task;
        }
        finally
        {
            ThrottleDatabaseQuerySemaphore.Release();

            lock (MetricsLock)
                AsyncCount--;
        }
    }
}

这将数据库并发问题转移到客户端。但这并不能解决它们。使用“通用”的数据导入步骤很容易 - 创建一个接受记录数组并使用SqlBulkCopy类将其写入表格的ActionBlock。为了保持简单,为每个目标表创建一个这样的块。在每个块之前添加BatchBlock以将记录批处理为5000或10000个数组。 - Panagiotis Kanavos
1
更高级的版本是使用自定义块将记录批处理并添加到包含记录和目标表名称的DTO中。将它们传递给ActionBlock,该块使用表名配置SqlBulkCopy实例并将记录写出。这将为您提供单个数据库编写器,但“分组”块必须处理每个目标表的不同批处理大小,以应对快速和慢速表。 - Panagiotis Kanavos
此外,DataFlow、Reactive Extensions 和 tasks 可以组合在一起。源块和目标块可以作为 observables 和 observers。您可以使用 Rx 操作,如“Group By”和“Buffer”,按目标分离记录并按计数和时间批处理它们。 - Panagiotis Kanavos
我不得不在客户端进行限流,因为我的应用程序的行为就像是拒绝服务攻击。目前,我在输入端批处理记录以防止在处理过程中耗尽内存。80%的性能瓶颈都在读取端。在写入方面,我可以使用SqlBulkCopy来节省一些时间,但是删除目标表上的所有索引使其非常高效。我会采纳您关于查看Reactive Extensions的建议。我买了Stephen Cleary的书,但还没有看到那里。 - NPCampbell
我不得不在客户端进行限流,因为我的应用程序的行为就像是一次拒绝服务攻击。这就是我所说的。这不是一个并行处理问题,而是一个数据工程问题。 - Panagiotis Kanavos
ThrottleDatabaseQueryAsync 正在限制 taskawaiting,这不是你想要的。你想要限制当前正在运行的任务数量。你应该有一个参数 Func<Task> taskFactory,而不是一个 Task task - Theodor Zoulias

0

解决这个问题最简单的方法是使用一个limited-concurrencyTaskScheduler来配置所有的块:

TaskScheduler scheduler = new ConcurrentExclusiveSchedulerPair(
    TaskScheduler.Default, maxConcurrencyLevel: 30).ConcurrentScheduler;

ExecutionDataflowBlockOptions execOption = new()
{
    TaskScheduler = scheduler,
    MaxDegreeOfParallelism = scheduler.MaximumConcurrencyLevel,
};

TaskScheduler只能限制在线程上执行的工作并发性。它们无法限制不在线程上运行的异步操作。因此,为了强制执行{{link3:MaximumConcurrencyLevel}}策略,不幸的是您必须将同步委托传递给所有Dataflow块。例如:

TransformBlock<Data, Data> doFirstIOWorkAsync = new(data =>
{
    return DoIOBoundWorkAsync(data).GetAwaiter().GetResult();
}, execOption);

这个改变将增加对ThreadPool线程的需求,所以你最好将ThreadPool在需求上立即创建的线程数量增加到比默认值Environment.ProcessorCount更高的值:
ThreadPool.SetMinThreads(100, 100); // At the start of the program

我提出这个解决方案并不是因为它最优,而是因为它容易实现。我的理解是,在你所处理的应用程序中,浪费一些内存在大约30个线程上,这些线程大部分时间都会被阻塞,不会对系统产生任何可测量的负面影响。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接