如何使用TPL数据流在预定义块之上创建可重用的处理逻辑?

4

我喜欢TPL数据流。

有趣的设计选择是,大多数预定义块使用委托来允许我们实现处理逻辑。这在简单情况下看起来不错。但是让我们考虑需要模块化和封装的真实世界大型应用程序。我发现使用委托方法编写结构良好的应用程序很困难和不自然。

例如,如果我想要一个可重用的类类型MultiplyIntByTwoTransformBlockNoOpActionBlock。我该如何实现?我希望能够从TransformBlock/ActionBlock继承并覆盖一些Process()方法来实现这一点。但是预定义块是密封的。它们只接受委托。

我知道我可以从头开始创建自定义块,但显然对我来说过于复杂,因为我所需的仅是在预定义块之上进行少量定制。

那么,我该如何实现我的目标?

更新: 我并不是说委托无法实现某些功能。我是说,在许多情况下,使用模板方法模式公开抽象块更好。比如说,我希望能够编写一个抽象的MultiplyBlockMultiplyByTwoBlockMultiplyByThreeBlock,利用多态性。不幸的是,委托不能提供这种数据和逻辑可重用性。


1
顺便提一下,你的 NoOpActionBlock 已经存在于 DataflowBlock.NullTarget() 中了。 - svick
是的,我在发布问题后找到了神奇的NullTarget()方法。无论如何,还是谢谢你 :) - Dodd
在 TPL Dataflow 中选择遵循函数式风格而不是传统的面向对象编程,这是一个非常有意义的设计选择。大多数专注于 Actor 模式的库甚至 语言 都采用了这种方法,这并非偶然。这是一种自然契合的方式。如果你正确地使用它,我认为你根本不会牺牲重用性或封装性,并且你可以在你的块内部整天使用对象。 - Todd Menier
4个回答

5
我认为你不需要自定义块类型,使用辅助方法就足够了。
public static IPropagatorBlock<int, int> CreateMultiplyIntTransformBlock(
    int multiplier)
{
    return new TransformBlock<int, int>(i => i * multiplier);
}

public static IPropagatorBlock<int, int> CreateMultiplyIntByTwoTransformBlock()
{
    return CreateMultiplyIntTransformBlock(2);
}

如果您认为委托不足以满足您的需求,那么也许您正在错误的地方放置逻辑。没有理由委托不能使用正确使用封装和模块化的对象。这样,您的应用程序逻辑就与执行代码的逻辑分开了。
但是,如果您真的想做您所要求的事情,您可以通过将“TransformBlock”封装在实现“IPropgatorBlock”的自定义类中,并且还具有抽象的“Process()”方法来实现。但是,要正确地完成这项工作有点复杂,请查看Guide to Implementing Custom TPL Dataflow Blocks获取详细信息。

谢谢你,svick。你提供的辅助函数很有效。但是它仍然暴露了一个实例,而不是一个类型。委托是块内的ACTION,我不希望它暴露给外部,比如将块链接在一起的管道类。 - Dodd
我并不是说委托无法做到这一点。我是说,在模板方法模式中公开抽象块在许多场景下更好。比如说,我希望我可以编写一个AbstractMultiplyBlock、MultiplyByTwoBlock和MultiplyByThreeBlock,利用多态的优势。但是遗憾的是,委托并不提供这种数据&逻辑的可重用性。谢谢。 - Dodd
@Dodd 我仍然不明白为什么你需要类型,你也可以使用委托来实现这种可重用性,参见编辑。如果您想要使用多态性,您可以从委托中调用另一种类型的多态方法。例如:AbstractMultiplicator multiplicator = new MultiplicateByTwo(); var block = new TransformBlock<int, int>(i => multiplicator.Multiplicate(i));。我认为这种方式可以更好地分离关注点。 - svick
你的AbstractMultiplicator对我来说很有意义,svick。基本上我们将处理器从块中移出,并在委托内部引用/捕获它们。虽然委托设计选择的原因仍然未知,但你的示例运行得非常好,我们确实获得了多态性。对你的回答点赞。非常感谢。 - Dodd
嗨@svick,抱歉挖出这个旧帖子,但正如我在另一个答案中提到的那样,现在有一个库[DataflowEx](https://github.com/gridsum/dataflowex)来处理这个问题(以及更多)。由于您是TPL Dataflow的专家,如果您能花些时间帮助审查该库,那将是非常棒的。谢谢;) - Dodd

4
现在有一个开源库DataflowEx,专门设计用来解决这个问题。此外,它还提供了更多功能来帮助构建和表示数据流图。
免责声明:我是DataflowEx的作者。它是为了回答我的问题而创建的。希望它也能帮助其他人 :)

1
你可以创建一个抽象类型,该类型实现与目标块类型相同的接口(TransformBlock 实现 IPropagatorBlockIReceivableSourceBlock)。
而不是复制该块的行为,将所有方法调用委托给该类型的一个 innerBlock
public abstract class AbstractMultiplyBlock<TInput, TOutput>
    : IPropagatorBlock<TInput, TOutput>, IReceivableSourceBlock<TOutput>
{
    private readonly TransformBlock<TInput, TOutput> innerBlock;

    protected AbstractMultiplyBlock(TransformBlock<TInput, TOutput> innerBlock)
    {
        this.innerBlock = innerBlock;
    }

    // ... interface implementations omitted for brevity, see appendix
}

这个抽象类具有与 TransformBlock 类相同的属性和方法。现在,创建派生类型,将 TransformBlock 的实例传递给基础构造函数。
public sealed class MultiplyByTwoBlock : AbstractMultiplyBlock<int, int>
{
    public MultiplyByTwoBlock()
        : base(new TransformBlock<int, int>(x => x * 2))
    {
    }
}

public sealed class MultiplyByThreeBlock : AbstractMultiplyBlock<int, int>
{
    public MultiplyByThreeBlock()
        : base(new TransformBlock<int, int>(x => x * 3))
    {
    }
}

使用方法与其他任何 TransformBlock 实例相同。

var calculator1 = new MultiplyByTwoBlock();
var calculator2 = new MultiplyByThreeBlock();
calculator1.LinkTo(calculator2);

// x = 10 * 2 * 3
calculator1.Post(10);
var x = calculator2.Receive();

附录

AbstractMultiplyBlock 的完整源代码

public abstract class AbstractMultiplyBlock<TInput, TOutput>
    : IPropagatorBlock<TInput, TOutput>, IReceivableSourceBlock<TOutput>
{
    private readonly TransformBlock<TInput, TOutput> innerBlock;

    protected AbstractMultiplyBlock(TransformBlock<TInput, TOutput> innerBlock)
    {
        this.innerBlock = innerBlock;
    }

    public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source,
        bool consumeToAccept)
    {
        return ((ITargetBlock<TInput>)innerBlock).OfferMessage(messageHeader, messageValue, source, consumeToAccept);
    }

    public void Complete()
    {
        innerBlock.Complete();
    }

    public void Fault(Exception exception)
    {
        ((IDataflowBlock)innerBlock).Fault(exception);
    }

    public Task Completion
    {
        get { return innerBlock.Completion; }
    }

    public IDisposable LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions linkOptions)
    {
        return innerBlock.LinkTo(target, linkOptions);
    }

    public TOutput ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out bool messageConsumed)
    {
        return ((ISourceBlock<TOutput>)innerBlock).ConsumeMessage(messageHeader, target, out messageConsumed);
    }

    public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
    {
        return ((ISourceBlock<TOutput>)innerBlock).ReserveMessage(messageHeader, target);
    }

    public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
    {
        ((ISourceBlock<TOutput>)innerBlock).ReleaseReservation(messageHeader, target);
    }

    public bool TryReceive(Predicate<TOutput> filter, out TOutput item)
    {
        return innerBlock.TryReceive(filter, out item);
    }

    public bool TryReceiveAll(out IList<TOutput> items)
    {
        return innerBlock.TryReceiveAll(out items);
    }
}

感谢你的示例,Steven。这个很优雅地工作。不过,“将所有方法调用委托给内部块”存在一些性能问题。 - Dodd
你能不能让 IDataflow 继承 IDataflowBlock 呢?这样,你就可以将整个图形视为一个块。 - Steven Liekens
在DataflowEx设计中,您可以拥有仅包含一个块的IDataflow。IDataflow不扩展IDataflowBlock的原因很简单,因为IDataflow通常表示许多块,这种继承可能会使用户感到困惑。另一方面,这并不会造成问题。对吧? :) - Dodd
我在想,您可能希望构建数据流网络的数据流网络。除非您可以创建递归引用其他IDataflow对象的IDataflow对象,否则无法实现该目标。 - Steven Liekens
因此,想法是将整个图形视为单个块。 - Steven Liekens
显示剩余4条评论

0

到目前为止,我的自定义块都是简单地编写一堆工厂方法,使用 DataflowBlock.Encapsulate(如果需要的话)。

因此,对于一个简单的扩展 TransformBlock 的块,它将返回传递给它的相同项目,看起来会像这样:

public static class MutatorBlock {
    public static TransformBlock<T, T> New<T>(Action<T> action, ExecutionDataflowBlockOptions options) {
        return new TransformBlock<T, T>(
            input => {
                action(input);
                return input;
            }, options);
    }
}

要实例化这个类,您可以使用MutatorBlock.New(...)而不是new MutatorBlock(...),但除此之外,没有太大的区别。
我猜我想知道的是,您到底需要什么类型?继承行不通,但组合仍然可以。您可以举个例子,说明这是一个重大问题吗?

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接