如果
TransformBlock
有一个
ProcessingCompleted
事件,当块完成其队列中所有消息的处理时触发,那将是很好的,但是实际上并没有这样的事件。下面是一种尝试纠正这个遗漏的方法。
CreateTransformBlockEx
方法接受一个
Action<Exception>
处理程序,当此“事件”发生时调用该处理程序。
本意是在块最终完成之前始终调用处理程序。不幸的是,在提供的
CancellationToken
被取消的情况下,完成(取消)会首先发生,处理程序稍后几毫秒被调用。要解决此不一致性需要一些棘手的解决方法,并且可能会产生其他不必要的副作用,因此我将其保留为原样。
public static IPropagatorBlock<TInput, TOutput>
CreateTransformBlockEx<TInput, TOutput>(Func<TInput, Task<TOutput>> transform,
Action<Exception> onProcessingCompleted,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
if (onProcessingCompleted == null)
throw new ArgumentNullException(nameof(onProcessingCompleted));
dataflowBlockOptions = dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();
var transformBlock = new TransformBlock<TInput, TOutput>(transform,
dataflowBlockOptions);
var bufferBlock = new BufferBlock<TOutput>(dataflowBlockOptions);
transformBlock.LinkTo(bufferBlock);
PropagateCompletion(transformBlock, bufferBlock, onProcessingCompleted);
return DataflowBlock.Encapsulate(transformBlock, bufferBlock);
async void PropagateCompletion(IDataflowBlock block1, IDataflowBlock block2,
Action<Exception> completionHandler)
{
try
{
await block1.Completion.ConfigureAwait(false);
}
catch { }
var exception =
block1.Completion.IsFaulted ? block1.Completion.Exception : null;
try
{
// Invoke the handler before completing the second block
completionHandler(exception);
}
finally
{
if (exception != null) block2.Fault(exception); else block2.Complete();
}
}
}
// Overload with synchronous lambda
public static IPropagatorBlock<TInput, TOutput>
CreateTransformBlockEx<TInput, TOutput>(Func<TInput, TOutput> transform,
Action<Exception> onProcessingCompleted,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
return CreateTransformBlockEx<TInput, TOutput>(
x => Task.FromResult(transform(x)), onProcessingCompleted,
dataflowBlockOptions);
}
当使用PropagateCompletion = true
选项调用时,本地函数PropagateCompletion
的代码模仿了源代码中的LinkTo
内置方法。
使用示例:
var httpClient = new HttpClient();
var downloader = CreateTransformBlockEx<string, string>(async url =>
{
return await httpClient.GetStringAsync(url);
}, onProcessingCompleted: ex =>
{
Console.WriteLine($"Download completed {(ex == null ? "OK" : "Error")}");
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 10
});
Completion
属性上继续。 - casperOne