TPL数据流,我能查询标记为完成但尚未完成的数据块吗?

5

考虑以下内容:

BufferBlock<int> sourceBlock = new BufferBlock<int>();
TransformBlock<int, int> targetBlock = new TransformBlock<int, int>(element =>
{
    return element * 2;
});

sourceBlock.LinkTo(targetBlock, new DataflowLinkOptions { PropagateCompletion = true });

//feed some elements into the buffer block
for(int i = 1; i <= 1000000; i++)
{
    sourceBlock.SendAsync(i);
}

sourceBlock.Complete();

targetBlock.Completion.ContinueWith(_ =>
{
    //notify completion of the target block
});
targetBlock 似乎从未完成,我认为原因是 TransformBlock 中的所有项都在输出队列中等待,因为我没有将 targetBlock 链接到任何其他 Dataflow 块。然而,我实际想要实现的是:当 (A) targetBlock 收到完成通知并且 (B) 输入队列为空时通知我。我不关心 TransformBlock 的输出队列中是否仍有项目。我该如何做?唯一能达到我的目的的方法是查询 sourceBlock 的完成状态并确保 targetBlockInputCount 为零吗?我不确定这种方法是否非常稳定(如果 sourceBlock 的最后一个项目已传递给 targetBlock,那么 sourceBlock 是否真正标记为已完成?)。是否有更优雅、更高效的方法来实现同样的目标?
编辑:我刚刚注意到即使检查 sourceBlock 的完成状态和 targetBlockInputCount 是否为零的“脏”方法也不容易实现。那个块会在哪里?它不能在 targetBlock 内部,因为一旦满足上述两个条件,targetBlock 就不再处理任何消息了。此外,检查 sourceBlock 的完成状态会引入很多低效率。

你为什么需要了解这个? - svick
因为我完成一个进程后,即使在outQueue中仍有项目,所有目标块中的项目都完成处理。一个原因是该进程的完成,另一个原因是延迟、吞吐量性能的测量。链接的下一个数据块可能需要更长时间来处理outQueue中的所有剩余项目。 - Matt
为什么不只是等待源块的任务呢?你不需要附加一个块,只需要在源块的Completion属性上继续。 - casperOne
3个回答

1

我相信你不能直接这样做。你可以使用反射从一些private字段中获取这些信息,但我不建议这样做。

但是你可以通过创建自定义块来实现。在Complete()的情况下很简单:只需创建一个将每个方法转发到原始块的块。除了Complete(),它还会记录日志。

如果要确定所有项目的处理何时完成,您可以将您的块链接到一个中间的BufferBlock。这样,输出队列将被快速清空,因此检查内部块的Completed将为您提供相当准确的测量结果。这将影响您的测量,但希望不会显著影响。

另一个选择是在块的委托结尾添加一些日志记录。这样,您就可以看到最后一个项目的处理何时完成。


1
如果TransformBlock有一个ProcessingCompleted事件,当块完成其队列中所有消息的处理时触发,那将是很好的,但是实际上并没有这样的事件。下面是一种尝试纠正这个遗漏的方法。 CreateTransformBlockEx方法接受一个Action<Exception>处理程序,当此“事件”发生时调用该处理程序。
本意是在块最终完成之前始终调用处理程序。不幸的是,在提供的CancellationToken被取消的情况下,完成(取消)会首先发生,处理程序稍后几毫秒被调用。要解决此不一致性需要一些棘手的解决方法,并且可能会产生其他不必要的副作用,因此我将其保留为原样。
public static IPropagatorBlock<TInput, TOutput>
    CreateTransformBlockEx<TInput, TOutput>(Func<TInput, Task<TOutput>> transform,
    Action<Exception> onProcessingCompleted,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    if (onProcessingCompleted == null)
        throw new ArgumentNullException(nameof(onProcessingCompleted));
    dataflowBlockOptions = dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();

    var transformBlock = new TransformBlock<TInput, TOutput>(transform,
        dataflowBlockOptions);
    var bufferBlock = new BufferBlock<TOutput>(dataflowBlockOptions);

    transformBlock.LinkTo(bufferBlock);
    PropagateCompletion(transformBlock, bufferBlock, onProcessingCompleted);
    return DataflowBlock.Encapsulate(transformBlock, bufferBlock);

    async void PropagateCompletion(IDataflowBlock block1, IDataflowBlock block2,
        Action<Exception> completionHandler)
    {
        try
        {
            await block1.Completion.ConfigureAwait(false);
        }
        catch { }
        var exception = 
            block1.Completion.IsFaulted ? block1.Completion.Exception : null;
        try
        {
            // Invoke the handler before completing the second block
            completionHandler(exception);
        }
        finally
        {
            if (exception != null) block2.Fault(exception); else block2.Complete();
        }
    }
}

// Overload with synchronous lambda
public static IPropagatorBlock<TInput, TOutput>
    CreateTransformBlockEx<TInput, TOutput>(Func<TInput, TOutput> transform,
    Action<Exception> onProcessingCompleted,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    return CreateTransformBlockEx<TInput, TOutput>(
        x => Task.FromResult(transform(x)), onProcessingCompleted,
        dataflowBlockOptions);
}

当使用PropagateCompletion = true选项调用时,本地函数PropagateCompletion的代码模仿了源代码中的LinkTo内置方法。

使用示例:

var httpClient = new HttpClient();
var downloader = CreateTransformBlockEx<string, string>(async url =>
{
    return await httpClient.GetStringAsync(url);
}, onProcessingCompleted: ex =>
{
    Console.WriteLine($"Download completed {(ex == null ? "OK" : "Error")}");
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 10
});

0

首先,使用IPropagator Block作为叶子终端是不正确的。但是,您仍然可以通过异步检查TargetBlock的输出缓冲区以获取输出消息,然后消耗它们以使缓冲区被清空来满足您的要求。

    `  BufferBlock<int> sourceBlock = new BufferBlock<int>();
       TransformBlock<int, int> targetBlock = new TransformBlock<int, int> 
       (element =>
       {

        return element * 2;
        });
        sourceBlock.LinkTo(targetBlock, new DataflowLinkOptions { 
        PropagateCompletion = true });

        //feed some elements into the buffer block
        for (int i = 1; i <= 100; i++)
        {
             sourceBlock.SendAsync(i);
        }

        sourceBlock.Complete();

        bool isOutputAvailable = await targetBlock.OutputAvailableAsync();
        while(isOutputAvailable)
        {
            int value = await targetBlock.ReceiveAsync();

            isOutputAvailable = await targetBlock.OutputAvailableAsync();
        }


        await targetBlock.Completion.ContinueWith(_ =>
        {
            Console.WriteLine("Target Block Completed");//notify completion of the target block
        });

`


2
如果您想清空TransformBlock的缓冲区,只需将其链接到NullTarget,如下所示:targetBlock.LinkTo(DataflowBlock.NullTarget<int>()); - Theodor Zoulias

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接