同时等待多个任务的设计模式

3
这是我的方法。我有几个`IEventProviders`,
interface IEventProvider
{
    Task<Event> GetEvent();
}

然后我创建了一个容器类来包装它们,并不断调用和等待 GetEvent() 来等待下一个事件,例如 socket 异步接收、定时器滴答声等。

class EventProviderContainer : IEventProvider
{
    private IEventProvider[] _providers;
    private Task<Event>[] _tasks;

    public EventProviderContainer(params IEventProvider[] providers)
    {
        _providers = providers;
    }

    public async Task<Event> GetEvent()
    {
        // Fill the _tasks first time we call the method.
        if (_tasks == null)
            _tasks = (from p in _providers select p.GetEvent()).ToArray();

        Task<Event> task = await Task<Event>.WhenAny(_tasks);

        // Get the provider index whose previous task is done.
        int index = Array.IndexOf(_tasks, task);
        // put next event of the provider into array.
        _tasks[index] = _providers[index].GetEvent();

        return await task;
    }
}

我认为这有点丑陋。有没有更好的方法呢?


2
这看起来很奇怪。你在尝试什么? - Daniel Hilgarth
所有的提供者都有无限数量的事件吗?如果不是,你将如何找出一个提供者已经完成了? - svick
1
像这样的“事件流”通常更适合于TPL Dataflow或Rx,将您的事件提供程序视为“推送器”,而不是不断地从它们中“拉取”事件,这将更加自然。 - Stephen Cleary
@svick,是的,它们都有无限数量的事件。 - Logan
@StephenCleary 好观点。谢谢。 - Logan
3个回答

2

对于一个实际上并不那么直接的任务来说,你的代码非常简短易懂,个人认为它并不丑陋。

我认为你不太可能找到一种更好的写法,除非你想改变整个接口。唯一需要改变的是将初始化_tasks移动到构造函数中(但也许你有理由这样做)。

但我同意Stephen的评论,对于事件,使用“push”语义通常比“pull”更合适。为此,Rx (IObservable<Event>)或TPL Dataflow (ISourceBlock<Event>)将非常有用。在这两种情况下,编写EventProviderContainer都相对简单。哪一种选择更好取决于你如何处理结果。


1

如果您想要每次只处理一个提供程序的事件,那么我建议您查看按完成情况处理任务MSDN文章,其中包括一个Interleaved方法。该方法接受一组任务,并返回一个新的任务数组,这些任务将按完成顺序产生。

另一方面,如果您想要持续接收来自每个提供程序的事件,则建议您查看Microsoft的响应式扩展(Rx)项目。

使用Rx,您的事件提供程序接口将变成:

public interface IEventProvider
{
    IObservable<Event> OnEvent();
}

然后您的容器提供程序将使用Observable.Merge扩展方法来组合每个子提供程序的事件。

return _providers.Select(provider => provider.OnEvent()).Merge();

要实际接收事件,您需要订阅可观察对象,通过附加回调委托来执行每次新事件可用时的操作。

var provider = new EventProviderContainer(
    new TestEventProvider("a", 1000),
    new TestEventProvider("b", 1300),
    new TestEventProvider("c", 1600));
provider.OnEvent().Subscribe(Console.WriteLine);
Console.ReadLine();

上面的示例使用了一个测试事件提供程序,它使用Observable.Timer扩展方法以毫秒为单位返回给定周期内连续的事件流。

return Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(_period))
                 .Select(i => new TestEvent(_name, i));

当你有多个“任务流”时,你会如何使用它?(这就是问题所在。) - svick

0

我认为你想要实现的正确代码是这样的:

    public async Task<Event> GetEvent()
    {
        // Fill the _tasks first time we call the method.
        if (_tasks == null)
            _tasks = (from p in _providers select p.GetEvent()).ToArray();


        return await await Task<Event>.WhenAny(_tasks);
    }

await await 看起来有点奇怪,但由于 WhenAny() 返回一个 Task<Task<T>>,所以这样做是正确的。


但是这样做不会像原始代码一样工作。而且原始代码已经有两个await了。 - svick
抱歉,在原始代码中,我在数组中更新了提供程序以进行下一个任务,前提是它的一个任务已经完成。 - Logan

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接