使用TPL Dataflow封装以动作块结束的管道。

8
TPL Dataflow 提供了一个非常实用的功能:
public static IPropagatorBlock<TInput, TOutput> Encapsulate<TInput, TOutput>(
    ITargetBlock<TInput> target, 
    ISourceBlock<TOutput> source)

为了让您能够将多个块封装到一个转换块中,它能够返回一个

IPropagatorBlock<TInput, TOutput>

这代表着你的管道的起始和结束块。

然而,如果我的管道中最后一个块是ActionBlock,我就不能使用它,因为ActionBlock不是SourceBlock,函数的返回类型将是ITargetBlock,而不是IPropagatorBlock。

本质上,我正在寻找类似于以下的函数:

public static ITargetBlock<TStart> Encapsulate<TStart, TEnd>(
        ITargetBlock<TStart> startBlock, 
        ActionBlock<TEnd> endBlock)

这是一个明智的写法,还是我错过了一些简单的东西?我不太确定如何写它 - 特别是完成部分的接线。我需要创建自己的自定义块类型吗?
编辑:
好的,所以读了@Panagiotis Kanavos的回答并进行了一些尝试,我想出了这个方案。这基于EncapsulatingPropagator类,这就是现有的DataflowBlock.Encapsulate方法使用的类。
internal sealed class EncapsulatingTarget<TStart, TEnd> : ITargetBlock<TStart>
{
        private readonly ITargetBlock<TStart> startBlock;

        private readonly ActionBlock<TEnd> endBlock;

        public EncapsulatingTarget(ITargetBlock<TStart> startBlock, ActionBlock<TEnd> endBlock)
        {
            this.startBlock = startBlock;
            this.endBlock = endBlock;
        }

        public Task Completion
        {
            get { return this.endBlock.Completion; }
        }

        public void Complete()
        {
            this.startBlock.Complete();
        }

        void IDataflowBlock.Fault(Exception exception)
        {
            if (exception == null)
            {
                throw new ArgumentNullException("exception");
            }

            this.startBlock.Fault(exception);
        }

        public DataflowMessageStatus OfferMessage(
            DataflowMessageHeader messageHeader, 
            TStart messageValue, 
            ISourceBlock<TStart> source, 
            bool consumeToAccept)
        {
            return this.startBlock.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
        }
    }

我刚刚完成了类似的更新!很好地将其通用化 - 这使您可以将模块/片段的构建与样板代码分离。 - Panagiotis Kanavos
能够将“终止”管道的初始目标和完成作为单个ActionBlock样实体处理,似乎是如此明显的需求,以至于API中的差距几乎感觉有意。但我会忽略这个问题,使用你在这里的工作。谢谢。 - Marc L.
2个回答

4

Encapsulate方法不是用于抽象现有的管道,而是用于创建需要自定义行为且无法使用现有块和链接实现的传播器块。

例如,滑动窗口示例会缓冲所有发送到其输入块的传入消息,并在滑动窗口过期时将所有检索到的消息批量输出到其输出块。

方法名称会引起很多混淆,但当你理解它们的目的时,它们确实有意义:

  • target参数是目标(输入)端点,前面的块将连接到该端点以发送消息。在这种情况下,处理传入消息并决定是否将其发布到输出(源)块的ActionBlock是有意义的。
  • source参数是源(输出)端点,后续步骤将连接到该端点以接收消息。使用ActionBlock作为源是没有意义的,因为它没有任何输出。

接受ActionBlock方法作为源的Encapsulate变体是无用的,因为您可以从任何先前的步骤链接到操作块。

编辑

如果您想要将管道模块化,即将其分解为可重用的、更易管理的部分,则可以创建一个构造类。在该类中,您可以像平常一样构建管道片段,链接块(确保完成被传播),然后公开第一步和最后一步的Completion任务作为公共属性,例如:

class MyFragment
{
    public TransformationBlock<SomeMessage,SomeOther> Input {get;}

    public Task Completion {get;}

    ActionBlock<SomeOther> _finalBlock;

    public MyFragment()
    {
        Input=new TransformationBlock<SomeMessage,SomeOther>(MyFunction);
        _finalBlock=new ActionBlock<SomeOther>(MyMethod);
        var linkOptions = new DataflowLinkOptions {PropagateCompletion = true}
        Input.LinkTo(_finalBlock,linkOptions);
    }

    private SomeOther MyFunction(SomeMessage msg)
    {
    ...
    }

    private void MyMethod(SomeOther msg)
    {
    ...
    }
}

将片段连接到管道只需要从管道块链接到暴露的块。要等待完成,只需等待暴露的任务。

如果您愿意,您可以在此处停止,或者您可以实现ITargetBlock,使片段看起来像一个目标块。您只需要将所有方法委托给Input块和Completion属性传递给最终块即可。

例如:

class MyFragment:ITargetBlock<SomeMessage> 
{
    ....

    public Task Completion {get;}

    public void Complete()
    {
        Input.Complete()
    };

    public void Fault(Exception exc)
    {
        Input.Fault(exc);
    }

    DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
        TInput messageValue,ISourceBlock<TInput> source,bool consumeToAccept)
    {
        return Input.OfferMessage(messageHeader,messageValue,source,consumeToAccept);
    }
}

编辑2

使用@bornfromanegg的类,可以将构建片段与暴露输入和完成的样板代码分开:

public ITargetBlock<SomeMessage> BuildMyFragment()
{
    var input=new TransformationBlock<SomeMessage,SomeOther>(MyFunction);
    var step2=new TransformationBlock<SomeOther,SomeFinal>(MyFunction2);
    var finalBlock=new ActionBlock<SomeFinal>(MyMethod);

    var linkOptions = new DataflowLinkOptions {PropagateCompletion = true}

    input.LinkTo(step2,linkOptions);
    step2.LinkTo(finalBlock,linkOptions);

    return new EncapsulatingTarget(input,finalBlock);
}

我在某种程度上同意。事实上,不仅使用ActionBlock作为源没有意义 - 因为它没有实现ISourceBlock,所以根本无法将其用作源。但我仍然有一个问题要解决:我有一个管道,其中最后一个块是一个操作块,并且我想对其进行封装。 - bornfromanegg
“Encapsulate”方法有一个非常特定(且有限)的目的——它不是通用的封装机制。当您需要在管道中包含具有明确定义的输入和输出端点的复杂图形或代码时,它非常有用。这是实现自己的块类的快捷方式。在您的情况下,您只需要从管道的其余部分到您的第一个块的链接,并等待最后一个块的完成任务——就像您使用任何管道一样。您甚至可以将块包含在类中,将输入块和ActionBlock的完成任务公开为属性。 - Panagiotis Kanavos
我认为你实际上想要做的是模块化,而不是封装一个管道。 - Panagiotis Kanavos
是的,那可能就是我的意思。我会编辑问题的。 - bornfromanegg
太棒了 - 这正是我在寻找的。谢谢你。 - bornfromanegg

2
在我的情况下,我想要封装一个包含多个最终ActionBlock的网络,并进行汇总完成,因此编辑问题中概述的解决方案不起作用。
由于与“最终块”唯一的交互涉及完成,因此仅呈现完成任务就足以封装。 (根据建议添加了目标-动作构造函数。)
public class EncapsulatingTarget<TInput> : ITargetBlock<TInput>
{
    private readonly ITargetBlock<TInput> startBlock;

    private readonly Task completion;

    public EncapsulatingTarget(ITargetBlock<TInput> startBlock, Task completion)
    {
        this.startBlock = startBlock;
        this.completion = completion;
    }

    public EncapsulatingTarget(ITargetBlock<TStart> startBlock, ActionBlock<TEnd> endBlock)
    {
        this.startBlock = startBlock;
        completion = endBlock.Completion;
    }

    public Task Completion => completion;

    public void Complete()
    {
        startBlock.Complete();
    }

    void IDataflowBlock.Fault(Exception exception)
    {
        if (exception == null)
        {
            throw new ArgumentNullException("exception");
        }

        startBlock.Fault(exception);
    }

    public DataflowMessageStatus OfferMessage(
        DataflowMessageHeader messageHeader,
        TInput messageValue,
        ISourceBlock<TInput> source,
        bool consumeToAccept)
    {
        return startBlock.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
    }
}

一个使用示例:

public ITargetBlock<Client.InputRecord> BuildDefaultFinalActions()
{
    var splitter = new BroadcastBlock<Client.InputRecord>(null);
    var getresults = new TransformManyBlock(...);    // propagator
    var saveinput = new ActionBlock(...);
    var saveresults = new ActionBlock(...);

    splitter.LinkTo(saveinput, PropagateCompletion);
    splitter.LinkTo(getresults, PropagateCompletion);
    getresults.LinkTo(saveresults, PropagateCompletion);

    return new Util.EncapsulatedTarget<Client.InputRecord>(splitter, Task.WhenAll(saveinput.Completion, saveresults.Completion));
}

我本可以将签名改为 EncapsulatingTarget<T>(ITargetBlock<T> target, params Task[] completions) 并将 WhenAll(...) 移至构造函数内部,但我不想对期望的完成通知做出假设。


1
这看起来是一个有用的补充。事实上,我认为你可以拥有 两个 构造函数,以及原始的带有 ActionBlock 的构造函数。这样,你就给调用者选择使用哪个的选择。 - bornfromanegg

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接