如何等待物品通过管道?(该提问涉及IT技术)

5

因此,我正在试图理解微软的Dataflow库。我已经建立了一个非常简单的管道,只包含两个块:

var start = new TransformBlock<Foo, Bar>();
var end = new ActionBlock<Bar>();
start.LinkTo(end);

现在,我可以通过调用以下代码异步处理Foo实例:
start.SendAsync(new Foo());

我不明白的是如何在需要时同步处理。我以为在SendAsync上等待就足够了:

start.SendAsync(new Foo()).Wait();

但显然,它会在第一个处理器接受项目时返回,而不是在项目完全处理后返回。那么有没有办法等到给定的项目被最后一个(end)块处理后再返回?除了通过整个管道传递WaitHandle外。


我从不使用SendAsync,总是只用Send()。两者之间几乎没有什么区别。在操作系统中,定时器滴答声将数据从应用程序传输到以太网卡。因此,Send()和SendAsync()只是填充TimerTick读取的流。 - jdweng
2个回答

3
简而言之,数据流默认不支持这种操作。你需要对数据进行标记,以便在处理完成后检索它。我已经写了一种方法,让消费者可以使用管道中的await来处理正在处理的Job。唯一要注意的是每个块都需要一个KeyValuePair<Guid, T>。这是基本的JobManager文章。请注意,文章中的代码有点过时,需要进行更新,但应该能引导你朝着正确的方向发展。
namespace ConcurrentFlows.DataflowJobs {
    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Threading.Tasks;
    using System.Threading.Tasks.Dataflow;

    /// <summary>
    /// A generic interface defining that:
    /// for a specified input type => an awaitable result is produced.
    /// </summary>
    /// <typeparam name="TInput">The type of data to process.</typeparam>
    /// <typeparam name="TOutput">The type of data the consumer expects back.</typeparam>
    public interface IJobManager<TInput, TOutput> {
        Task<TOutput> SubmitRequest(TInput data);
    }

    /// <summary>
    /// A TPL-Dataflow based job manager.
    /// </summary>
    /// <typeparam name="TInput">The type of data to process.</typeparam>
    /// <typeparam name="TOutput">The type of data the consumer expects back.</typeparam>
    public class DataflowJobManager<TInput, TOutput> : IJobManager<TInput, TOutput> {

        /// <summary>
        /// It is anticipated that jobHandler is an injected
        /// singleton instance of a Dataflow based 'calculator', though this implementation
        /// does not depend on it being a singleton.
        /// </summary>
        /// <param name="jobHandler">A singleton Dataflow block through which all jobs are processed.</param>
        public DataflowJobManager(IPropagatorBlock<KeyValuePair<Guid, TInput>, KeyValuePair<Guid, TOutput>> jobHandler) {
            if (jobHandler == null) { throw new ArgumentException("Argument cannot be null.", "jobHandler"); }

            this.JobHandler = JobHandler;
            if (!alreadyLinked) {
                JobHandler.LinkTo(ResultHandler, new DataflowLinkOptions() { PropagateCompletion = true });
                alreadyLinked = true;
            }
        }

        private static bool alreadyLinked = false;            

        /// <summary>
        /// Submits the request to the JobHandler and asynchronously awaits the result.
        /// </summary>
        /// <param name="data">The input data to be processd.</param>
        /// <returns></returns>
        public async Task<TOutput> SubmitRequest(TInput data) {
            var taggedData = TagInputData(data);
            var job = CreateJob(taggedData);
            Jobs.TryAdd(job.Key, job.Value);
            await JobHandler.SendAsync(taggedData);
            return await job.Value.Task;
        }

        private static ConcurrentDictionary<Guid, TaskCompletionSource<TOutput>> Jobs {
            get;
        } = new ConcurrentDictionary<Guid, TaskCompletionSource<TOutput>>();

        private static ExecutionDataflowBlockOptions Options {
            get;
        } = GetResultHandlerOptions();

        private static ITargetBlock<KeyValuePair<Guid, TOutput>> ResultHandler {
            get;
        } = CreateReplyHandler(Options);

        private IPropagatorBlock<KeyValuePair<Guid, TInput>, KeyValuePair<Guid, TOutput>> JobHandler {
            get;
        }

        private KeyValuePair<Guid, TInput> TagInputData(TInput data) {
            var id = Guid.NewGuid();
            return new KeyValuePair<Guid, TInput>(id, data);
        }

        private KeyValuePair<Guid, TaskCompletionSource<TOutput>> CreateJob(KeyValuePair<Guid, TInput> taggedData) {
            var id = taggedData.Key;
            var jobCompletionSource = new TaskCompletionSource<TOutput>();
            return new KeyValuePair<Guid, TaskCompletionSource<TOutput>>(id, jobCompletionSource);
        }

        private static ExecutionDataflowBlockOptions GetResultHandlerOptions() {
            return new ExecutionDataflowBlockOptions() {
                MaxDegreeOfParallelism = Environment.ProcessorCount,
                BoundedCapacity = 1000
            };
        }

        private static ITargetBlock<KeyValuePair<Guid, TOutput>> CreateReplyHandler(ExecutionDataflowBlockOptions options) {
            return new ActionBlock<KeyValuePair<Guid, TOutput>>((result) => {
                RecieveOutput(result);
            }, options);
        }

        private static void RecieveOutput(KeyValuePair<Guid, TOutput> result) {
            var jobId = result.Key;
            TaskCompletionSource<TOutput> jobCompletionSource;
            if (!Jobs.TryRemove(jobId, out jobCompletionSource)) {
                throw new InvalidOperationException($"The jobId: {jobId} was not found.");
            }
            var resultValue = result.Value;
            jobCompletionSource.SetResult(resultValue);            
        }
    }
}

1
博客链接似乎已经失效。对于需要的任何人,这里有一个存档版本 - mcont
1
@MatteoContrini 谢谢你的帮助,如果有人需要,这里是代码链接:https://github.com/ptsteward/ConcurrentFlows - JSteward

2
我最终使用了以下流程:
var start = new TransformBlock<FooBar, FooBar>(...);
var end = new ActionBlock<FooBar>(item => item.Complete());
start.LinkTo(end);
var input = new FooBar {Input = new Foo()};
start.SendAsync(input);
input.Task.Wait();

Where

class FooBar
{
    public Foo Input { get; set; }
    public Bar Result { get; set; }
    public Task<Bar> Task { get { return _taskSource.Task; } }

    public void Complete()
    {
        _taskSource.SetResult(Result);
    }

    private TaskCompletionSource<Bar> _taskSource = new TaskCompletionSource<Bar>();
}

不是最理想的,但它可以工作。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接