数据流:将工作分成小任务并再次分组

14

我需要完成以下工作:

  1. 从数据库获取页面对象
  2. 对于每个页面,获取所有图片并处理它们(I/O绑定,例如上传到CDN)
  3. 如果所有图片都成功处理,则在数据库中标记页面为已处理

由于我需要控制并行处理的页面数量,因此我决定使用TPL数据流:

 ____________________________
|         Data pipe          |
|   BufferBlock<Page>        |
|   BoundedCapacity = 1      |
|____________________________|
              |
 ____________________________
|       Process images       |
| TransformBlock<Page, Page> |
| BoundedCapacity = 1        |
| MaxDegreeOfParallelism = 8 |
|____________________________|
              |
 ____________________________
|        Save page           |
| ActionBlock<Page>          |
| BoundedCapacity = 1        |
| MaxDegreeOfParallelism = 5 |
|____________________________|

现在我需要“处理图像”并行处理图像,但我想限制当前工作中所有并行页面上正在处理的图像数量。

我可以使用TrasnformManyBlock来进行“处理图像”,但是如何将它们聚集回“保存页面”块呢?

         ____________________________
        |         Data pipe          |
        |   BufferBlock<Page>        |
        |   BoundedCapacity = 1      |
        |____________________________|
                      |
     ___________________________________
    |           Load images             |
    | TransformManyBlock<Page, Image[]> |
    | BoundedCapacity = 1               |
    | MaxDegreeOfParallelism = 8        |
    |___________________________________|
      /              |              \
   ______________________________________________
 _|____________________________________________  |
|              Process image                   | |
| TransformBlock<ImageWithPage, ImageWithPage> | |
| BoundedCapacity = 1                          | |
| MaxDegreeOfParallelism = 8                   |_|
|______________________________________________|
      \              |               /
         How to group images by page ?
                     |
        ____________________________
       |        Save page           |
       | ActionBlock<Page>          |
       | BoundedCapacity = 1        |
       | MaxDegreeOfParallelism = 5 |
       |____________________________|

此外,可能会有一张图片无法处理,我不想保存带有失败图片的页面。


问题到底是什么?根据图表,您已经想出来了。将中间步骤的MaxDegreeOfParallelism设置为所需级别。 - usr
我已经编辑了问题,并附上了使用TransformManyBlock的图表。 - Michael Logutov
顺便提一下,如果使用BoundedCapacity配置块时,其值小于MaxDegreeOfParallelism,则并行度将降低到容量的值。换句话说,如果只允许缓冲一个图像,则该块无法同时处理8个图像。 - Theodor Zoulias
3个回答

7
您可以通过记录给定页面的每个图像何时到达,然后在所有图像到达时发送该页面来将它们分组。为了确定这一点,页面需要知道它包含多少图像,但我假设您已经知道了。
在代码中,它可能看起来像这样:
public static IPropagatorBlock<TSplit, TMerged>
    CreaterMergerBlock<TSplit, TMerged>(
    Func<TSplit, TMerged> getMergedFunc, Func<TMerged, int> getSplitCount)
{
    var dictionary = new Dictionary<TMerged, int>();

    return new TransformManyBlock<TSplit, TMerged>(
        split =>
        {
            var merged = getMergedFunc(split);
            int count;
            dictionary.TryGetValue(merged, out count);
            count++;
            if (getSplitCount(merged) == count)
            {
                dictionary.Remove(merged);
                return new[] { merged };
            }

            dictionary[merged] = count;
            return new TMerged[0];
        });
}

使用方法:

var dataPipe = new BufferBlock<Page>();

var splitter = new TransformManyBlock<Page, ImageWithPage>(
    page => page.LoadImages(),
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });

var processImage = new TransformBlock<ImageWithPage, ImageWithPage>(
    image =>
    {
        // process the image here
        return image;
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });

var merger = CreaterMergerBlock(
    (ImageWithPage image) => image.Page, page => page.ImageCount);

var savePage = new ActionBlock<Page>(
    page => /* save the page here */,
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

dataPipe.LinkTo(splitter);
splitter.LinkTo(processImage);
processImage.LinkTo(merger);
merger.LinkTo(savePage);

谢谢。这与我已经在思考的相当相似,并使用了Dataflow。但是你不应该使用ConcurrentDictionary吗? - Michael Logutov
1
@MichaelLogutov 这里不需要使用ConcurrentDictionary,因为TransformManyBlock已将MaxDegreeOfParallelism设置为1。这意味着在任何给定时间内,Dictionary都不会被多个线程访问。 - svick
好的,忘记了默认选项设置DOP为1。谢谢。 - Michael Logutov
@svick,我知道已经过了很长时间,但你能否请提供一些有关合并的解释?另请参见我的问题 - Little geek
@svick 真不错!谢谢你! - Gleb

0

.NET平台有一个很好的接口可以表示父子关系,即IGrouping<TKey, TElement>接口。它只是一个IEnumerable,同时还具有一个Key属性。键可以是任何东西,在这种情况下,它可以是需要处理的Page。分组的内容可以是属于每个页面的Image,需要上传。这导致了一个数据流块的想法,可以通过独立处理每个TInput来处理IGrouping<TKey, TInput>对象,然后按分组聚合结果,并最终将它们作为IGrouping<TKey, TOutput>对象输出。以下是此想法的实现:

public static TransformBlock<IGrouping<TKey, TInput>, IGrouping<TKey, TOutput>>
    CreateTransformGroupingBlock<TKey, TInput, TOutput>(
        Func<TKey, TInput, Task<TOutput>> transform,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    dataflowBlockOptions ??= new ExecutionDataflowBlockOptions();

    var actionBlock = new ActionBlock<Task<Task<TOutput>>>(taskTask =>
    {
        // An exception thrown by the following line would cause buggy behavior.
        // According to the documentation it should never fail.
        taskTask.RunSynchronously();
        return taskTask.Unwrap();
    }, dataflowBlockOptions);

    var completionCTS = new CancellationTokenSource();
    _ = actionBlock.Completion
        .ContinueWith(_ => completionCTS.Cancel(), TaskScheduler.Default);

    var transformBlock = new TransformBlock<IGrouping<TKey, TInput>,
        IGrouping<TKey, TOutput>>(async grouping =>
    {
        if (grouping == null) throw new InvalidOperationException("Null grouping.");
        var tasks = new List<Task<TOutput>>();
        foreach (var item in grouping)
        {
            // Create a cold task that will be either executed by the actionBlock,
            // or will be canceled by the completionCTS. This should eliminate
            // any possibility that an awaited task will remain cold forever.
            var taskTask = new Task<Task<TOutput>>(() => transform(grouping.Key, item),
                completionCTS.Token);
            var accepted = await actionBlock.SendAsync(taskTask);
            if (!accepted)
            {
                // The actionBlock has failed.
                // Skip the rest of the items. Pending tasks should still be awaited.
                tasks.Add(Task.FromCanceled<TOutput>(new CancellationToken(true)));
                break;
            }
            tasks.Add(taskTask.Unwrap());
        }
        TOutput[] results = await Task.WhenAll(tasks);
        return results.GroupBy(_ => grouping.Key).Single(); // Convert to IGrouping
    }, dataflowBlockOptions);

    // Cleanup
    _ = transformBlock.Completion
        .ContinueWith(_ => actionBlock.Complete(), TaskScheduler.Default);
    _ = Task.WhenAll(actionBlock.Completion, transformBlock.Completion)
        .ContinueWith(_ => completionCTS.Dispose(), TaskScheduler.Default);

    return transformBlock;
}

// Overload with synchronous lambda
public static TransformBlock<IGrouping<TKey, TInput>, IGrouping<TKey, TOutput>>
    CreateTransformGroupingBlock<TKey, TInput, TOutput>(
        Func<TKey, TInput, TOutput> transform,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    return CreateTransformGroupingBlock<TKey, TInput, TOutput>(
        (key, item) => Task.FromResult(transform(key, item)), dataflowBlockOptions);
}

此实现由两个块组成,一个 TransformBlock 处理分组,一个内部的 ActionBlock 处理单个项。两个块都使用相同的用户提供选项进行配置。 TransformBlock 逐个发送要处理的项到 ActionBlock,然后等待结果,最后使用以下巧妙的行构造输出 IGrouping<TKey, TOutput>

return results.GroupBy(_ => grouping.Key).Single(); // Convert to IGrouping

这弥补了当前在.NET平台上没有公开可用的实现IGrouping接口的类的事实。 GroupBy+Single组合可以解决问题,但它有一个限制,即不允许创建空的IGrouping。 如果这是一个问题,那么创建一个实现此接口的类始终是一个选项。 实现一个非常简单(这里是一个例子)。

CreateTransformGroupingBlock方法的使用示例:

var processPages = new TransformBlock<Page, IGrouping<Page, Image>>(page =>
{
    Image[] images = GetImagesFromDB(page);
    return images.GroupBy(_ => page).Single(); // Convert to IGrouping
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });

var uploadImages = CreateTransformGroupingBlock<Page, Image, Image>(async (page, image) =>
{
    await UploadImage(image);
    return image;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });

var savePages = new ActionBlock<IGrouping<Page, Image>>(grouping =>
{
    var page = grouping.Key;
    foreach (var image in grouping) SaveImageToDB(image, page);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

processPages.LinkTo(uploadImages);
uploadImages.LinkTo(savePages);

uploadImages 变量的类型是 TransformBlock<IGrouping<Page, Image>, IGrouping<Page, Image>>。在这个例子中,TInputTOutput 的类型相同,因为图片不需要进行转换。


-1
考虑将“加载图像”和“处理图像”合并为一个TransformBlock块。这样,您就不会遇到将单个页面的图像保持在一起的问题。
为了实现并发限制目标,请使用SemaphoreSlim:
SemaphoreSlim processImageDopLimiter = new SemaphoreSlim(8);

//...

var page = ...; //TransformBlock<Page, MyPageAndImageDTO> block input
var images = GetImages(page);
ImageWithPage[] processedImages =
 images
 .AsParallel()
 .Select(i => {
    processImageDopLimiter.WaitOne();
    var result = ProcessImage(i);
    processImageDopLimiter.ReleaseOne();
    return result;
 })
 .ToList();
return new { page, processedImages };

这将导致相当多的线程被阻塞等待。如果您愿意,可以使用此处理的异步版本。这与问题无关。


1
我希望有更多使用Dataflow方式的编程,因为我认为通过阻塞任务来人为限制DOP就像欺骗Dataflow一样。而Dataflow的主要目标是对工作负载进行编排。这样做是否会导致Dataflow无法正确地调整自身以适应工作负载呢? - Michael Logutov
1
好问题。数据流不会调整任何东西。它会将您指定的 DOP 最大化。就像我们的自定义解决方案一样。请注意,您可以说 await semaphoreSlim.WaitOneAsync() 并且在等待时不会阻塞任何线程。在内部,信号量具有队列,并且等待者被放置在其中。我看不出手动限制 DOP 和使用 Dataflow 之间的根本区别。您是否看到任何具体的劣势?实际上,在这里根本不需要使用 Dataflow,因为信号量可以完成所有这些操作。但是使用 Dataflow 似乎有点合理。 - usr
我建议使用TransformBlock。我之前的编辑是不完整的。已经修复了。 我们的进展很慢。您是否还有任何疑虑,阻止您采纳这个解决方案? - usr
好吧,我想这目前是唯一的解决方案。另一个是使用自定义调度程序,它的核心(我认为)会类似。 - Michael Logutov
我从未谈论过线程阻塞。我们并没有直接使用Thread类 - 你为什么要谈论它呢?当然,我知道我们不会阻塞线程 - 这是在使用任务时的一个前提条件,除非你回到Thead API。我说的是通过任务浪费线程池中的插槽。你创建了比所需更多的Task类。这就是你解决方案的问题所在。 - Michael Logutov
显示剩余10条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接