一个TPL Dataflow块下游如何获取由源产生的数据?

8

我正在使用TPL Dataflow处理图像。我接收到一个处理请求,从流中读取图像,应用多个转换,然后将结果图像写入另一个流:

Request -> Stream -> Image -> Image ... -> Stream

为此,我使用块:

BufferBlock<Request>
TransformBlock<Request,Stream>
TransformBlock<Stream,Image>
TransformBlock<Image,Image>
TransformBlock<Image,Image>
...
writerBlock = new ActionBlock<Image>

问题在于初始的 Request 包含创建结果 Stream 所需的一些数据以及我在那时需要的一些附加信息。我是否必须将原始的 Request(或其他上下文对象)沿着所有其他块传递到 writerBlock 这样的地方:

TransformBlock<Request,Tuple<Request,Stream>>
TransformBlock<Tuple<Request,Stream>,Tuple<Request,Image>>
TransformBlock<Tuple<Request,Image>,Tuple<Request,Image>>
...

这段文字中提到了一个“丑陋”的问题,那么有没有办法将第一个块链接到最后一个块(或者更一般地说,链接到需要额外数据的块)?

2个回答

8

是的,你基本上需要按照你所描述的方式,将附加数据从每个块传递到下一个块。

但是使用一些辅助方法,你可以使这个过程变得更简单:

public static IPropagatorBlock<TInput, Tuple<TOutput, TInput>>
    CreateExtendedSource<TInput, TOutput>(Func<TInput, TOutput> transform)
{
    return new TransformBlock<TInput, Tuple<TOutput, TInput>>(
        input => Tuple.Create(transform(input), input));
}

public static IPropagatorBlock<Tuple<TInput, TExtension>, Tuple<TOutput, TExtension>>
    CreateExtendedTransform<TInput, TOutput, TExtension>(Func<TInput, TOutput> transform)
{
    return new TransformBlock<Tuple<TInput, TExtension>, Tuple<TOutput, TExtension>>(
        tuple => Tuple.Create(transform(tuple.Item1), tuple.Item2));
}

签名看起来很可怕,但实际上并不那么糟糕。

此外,您可能希望添加传递选项到创建的块的重载,或者采用异步委托的重载。

例如,如果您想使用单独的块对数字执行一些操作,并在沿途传递原始数字,您可以这样做:

var source = new BufferBlock<int>();
var divided = CreateExtendedSource<int, double>(i => i / 2.0);
var formatted = CreateExtendedTransform<double, string, int>(d => d.ToString("0.0"));
var writer = new ActionBlock<Tuple<string, int>>(tuple => Console.WriteLine(tuple));

source.LinkTo(divided);
divided.LinkTo(formatted);
formatted.LinkTo(writer);

for (int i = 0; i < 10; i++)
    source.Post(i);

正如您所看到的,您的Lambda(最后一个除外)仅处理“当前”值(根据管道阶段而定,可以是intdoublestring),而“原始”值(始终为int)会自动传递。在任何时候,您都可以使用正常构造函数创建的代码块来访问这两个值(就像示例中的最终ActionBlock一样)。
(那个BufferBlock实际上并不是必需的,但我添加它是为了更好地匹配您的设计。)

这是一个不错的方法。我在 TDF 上读过你的一些答案,你显然很了解这个主题。那么你是说没有办法绕过中间块将上下文数据发送到数据流网络吗? - chase
1
如果您确信整个管道中的项目顺序将得到维护(TransformBlock确实会维护顺序),那么您可以使用类似于附加数据的全局队列。第一个块将其添加到队列中,而最后一个块将从队列中读取它。但我认为这样做会很脆弱。这意味着每个输入在整个管道中必须产生恰好一个输出。 - svick

1
我可能有些超前了,因为我才刚开始尝试使用TPL Dataflow。 但我相信你可以使用一个BroadcastBlock作为源和第一个目标之间的中介来完成这个任务。 BroadcastBlock可以将消息提供给多个目标,所以你可以使用它提供给你的目标,并且还可以提供给一个JoinBlock,最后将结果与原始消息合并。
source -> Broadcast ->-----------------------------------------> JoinBlock <source, result>
                    -> Transformation1 -> Transformation 'n'  ->

例如:

var source = new BufferBlock<int>();
var transformation =  new TransformBlock<int, int>(i => i * 100);

var broadCast = new BroadcastBlock<int>(null);
source.LinkTo(broadCast);
broadCast.LinkTo(transformation);

var jb = new JoinBlock<int, int>();
broadCast.LinkTo(jb.Target1);
transformation.LinkTo(jb.Target2);

jb.LinkTo(new ActionBlock<Tuple<int, int>>(
          c => Console.WriteLine("Source:{0}, Target Result: {1}", c.Item1, c.Item2)));

source.Post(1);
source.Post(2);

source.Complete();

产量...

来源:1,目标结果:100

来源:2,目标结果:200

我只是不太确定它在异步环境中的行为会如何。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接