.NET平台有一个很好的接口可以表示父子关系,即IGrouping<TKey, TElement>
接口。它只是一个IEnumerable
,同时还具有一个Key
属性。键可以是任何东西,在这种情况下,它可以是需要处理的Page
。分组的内容可以是属于每个页面的Image
,需要上传。这导致了一个数据流块的想法,可以通过独立处理每个TInput
来处理IGrouping<TKey, TInput>
对象,然后按分组聚合结果,并最终将它们作为IGrouping<TKey, TOutput>
对象输出。以下是此想法的实现:
public static TransformBlock<IGrouping<TKey, TInput>, IGrouping<TKey, TOutput>>
CreateTransformGroupingBlock<TKey, TInput, TOutput>(
Func<TKey, TInput, Task<TOutput>> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
if (transform == null) throw new ArgumentNullException(nameof(transform));
dataflowBlockOptions ??= new ExecutionDataflowBlockOptions();
var actionBlock = new ActionBlock<Task<Task<TOutput>>>(taskTask =>
{
// An exception thrown by the following line would cause buggy behavior.
// According to the documentation it should never fail.
taskTask.RunSynchronously();
return taskTask.Unwrap();
}, dataflowBlockOptions);
var completionCTS = new CancellationTokenSource();
_ = actionBlock.Completion
.ContinueWith(_ => completionCTS.Cancel(), TaskScheduler.Default);
var transformBlock = new TransformBlock<IGrouping<TKey, TInput>,
IGrouping<TKey, TOutput>>(async grouping =>
{
if (grouping == null) throw new InvalidOperationException("Null grouping.");
var tasks = new List<Task<TOutput>>();
foreach (var item in grouping)
{
// Create a cold task that will be either executed by the actionBlock,
// or will be canceled by the completionCTS. This should eliminate
// any possibility that an awaited task will remain cold forever.
var taskTask = new Task<Task<TOutput>>(() => transform(grouping.Key, item),
completionCTS.Token);
var accepted = await actionBlock.SendAsync(taskTask);
if (!accepted)
{
// The actionBlock has failed.
// Skip the rest of the items. Pending tasks should still be awaited.
tasks.Add(Task.FromCanceled<TOutput>(new CancellationToken(true)));
break;
}
tasks.Add(taskTask.Unwrap());
}
TOutput[] results = await Task.WhenAll(tasks);
return results.GroupBy(_ => grouping.Key).Single(); // Convert to IGrouping
}, dataflowBlockOptions);
// Cleanup
_ = transformBlock.Completion
.ContinueWith(_ => actionBlock.Complete(), TaskScheduler.Default);
_ = Task.WhenAll(actionBlock.Completion, transformBlock.Completion)
.ContinueWith(_ => completionCTS.Dispose(), TaskScheduler.Default);
return transformBlock;
}
// Overload with synchronous lambda
public static TransformBlock<IGrouping<TKey, TInput>, IGrouping<TKey, TOutput>>
CreateTransformGroupingBlock<TKey, TInput, TOutput>(
Func<TKey, TInput, TOutput> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
if (transform == null) throw new ArgumentNullException(nameof(transform));
return CreateTransformGroupingBlock<TKey, TInput, TOutput>(
(key, item) => Task.FromResult(transform(key, item)), dataflowBlockOptions);
}
此实现由两个块组成,一个 TransformBlock
处理分组,一个内部的 ActionBlock
处理单个项。两个块都使用相同的用户提供选项进行配置。 TransformBlock
逐个发送要处理的项到 ActionBlock
,然后等待结果,最后使用以下巧妙的行构造输出 IGrouping<TKey, TOutput>
:
return results.GroupBy(_ => grouping.Key).Single()
这弥补了当前在.NET平台上没有公开可用的实现IGrouping
接口的类的事实。 GroupBy
+Single
组合可以解决问题,但它有一个限制,即不允许创建空的IGrouping
。 如果这是一个问题,那么创建一个实现此接口的类始终是一个选项。 实现一个非常简单(这里是一个例子)。
CreateTransformGroupingBlock
方法的使用示例:
var processPages = new TransformBlock<Page, IGrouping<Page, Image>>(page =>
{
Image[] images = GetImagesFromDB(page);
return images.GroupBy(_ => page).Single();
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
var uploadImages = CreateTransformGroupingBlock<Page, Image, Image>(async (page, image) =>
{
await UploadImage(image);
return image;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
var savePages = new ActionBlock<IGrouping<Page, Image>>(grouping =>
{
var page = grouping.Key;
foreach (var image in grouping) SaveImageToDB(image, page);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
processPages.LinkTo(uploadImages);
uploadImages.LinkTo(savePages);
uploadImages
变量的类型是 TransformBlock<IGrouping<Page, Image>, IGrouping<Page, Image>>
。在这个例子中,TInput
和 TOutput
的类型相同,因为图片不需要进行转换。
BoundedCapacity
配置块时,其值小于MaxDegreeOfParallelism
,则并行度将降低到容量的值。换句话说,如果只允许缓冲一个图像,则该块无法同时处理8个图像。 - Theodor Zoulias