使响应式扩展缓冲区等待异步操作完成

6
我正在使用响应式扩展(Rx)来缓冲一些数据。但是我遇到了一个问题,我需要对这些数据进行异步操作,但在异步操作完成之前,我不希望缓冲区将下一组数据通过。
我尝试过两种方式来结构化代码(假设的例子):
public async Task processFiles<File>(IEnumerable<File> files)
{
    await files.ToObservable()
        .Buffer(10)
        .SelectMany(fi => fi.Select(f => upload(f)) //Now have an IObservable<Task>
        .Select(t => t.ToObservable())
        .Merge()
        .LastAsync();
}

public Task upload(File item)
{
    return Task.Run(() => { //Stuff });
}

或者

public async Task processFiles<File>(IEnumerable<File> files)
{
    var buffered = files.ToObservable()
        .Buffer(10);

    buffered.Subscribe(async files => await Task.WhenAll(files.Select(f => upload(f)));

    await buffered.LastAsync();
}

public Task upload(File item)
{
    return Task.Run(() => { //Stuff });
}

不幸的是,这两种方法都不起作用,因为缓冲区在异步操作完成之前就推送了下一个组。意图是让每个缓冲组以异步方式执行,并且只有当该操作完成后,才继续处理下一个缓冲组。

非常感谢您的帮助。


http://channel9.msdn.com/Shows/Going+Deep/Rx-Update-Async-support-IAsyncEnumerable-and-more-with-Jeff-and-Wes - spender
你的第二个例子没有意义,你根本没有使用 upload()。而且 Task.WhenAll() 无法在 File 集合上工作。 - svick
@svick 打错了,已更正。 - MgSam
2个回答

3
为了确保我理解你的意思,你想确保在处理前一个缓冲区时仍保持缓冲项目,而仅在处理完前一个缓冲区后呈现每个缓冲区。同时,你还需要将每个缓冲区的处理设置为异步。
可能有必要考虑一些理论问题,因为我必须承认,我对这种方法有点困惑。IObservable通常被称为IEnumerable的对偶,因为它通过数据向消费者“推送”而不是消费者自己选择“拉取”来模拟后者,并且两者的关键区别在于数据是由生产者推送而不是由消费者拉取。
你尝试使用缓冲流作为IEnumerable而不是IObservable - 实际上,你想要拉取缓冲区而不是让它们被推送给你 - 因此,我不得不想知道是否选择了正确的工具来完成这项工作?你是否试图在处理缓冲区时阻止缓冲操作本身?对于接收数据推送的消费者来说,这不是真正的正确方法。
你可以考虑在缓冲操作中应用ToEnumerable()调用,以便在准备好缓冲区时处理它们。但是这并不能防止继续发生缓冲,即使你正在处理当前的缓冲区。
你很难阻止这种情况发生-在应用于缓冲的Select()操作中同步处理缓冲可以保证在选择投影完成之前不会发生后续的OnNext()调用。Rx库运算符执行Rx语法,因此保证是免费的。但只保证了OnNext()调用之间的非重叠调用 - 没有任何方法说特定运算符不能(并且确实不应该)继续获取下一个准备就绪的OnNext()。这是推动基础API的性质。
如果你想阻止缓冲区,为什么你认为需要将投影设置为异步很不清楚?请考虑这个问题 - 我怀疑在观察者中使用同步的Select()可能会解决问题,但从你的问题中并不完全清楚。
类似于同步的Select(),同步的OnNext()处理程序更容易处理, 但它们不同,因为(根据Observable的实现)你只能阻止将OnNext()调用传递给那个Subscriber而不是所有Subscribers。但是,对于只有一个Subscriber的情况,它是等效的,所以你可以做以下操作:
void Main()
{
    var source = Observable.Range(1, 4);

    source.Buffer(2)
        .Subscribe(i =>
    {
        Console.WriteLine("Start Processing Buffer");
        Task.WhenAll(from n in i select DoUpload(n)).Wait();
        Console.WriteLine("Finished Processing Buffer");
    });
}

private Task DoUpload(int i)
{
    return Task.Factory.StartNew(
        () => {
            Thread.Sleep(1000);
            Console.WriteLine("Process File " + i);
        });
}

以下是输出结果(*无法保证缓冲区内进程文件x的顺序):
Start Processing Buffer
Process File 2
Process File 1
Finished Processing Buffer
Start Processing Buffer
Process File 3
Process File 4
Finished Processing Buffer

如果你想使用Select()函数,但是你的查询结果为空,你可以按照以下方式操作:

source.Buffer(2)
    .Select(i =>
{
    Console.WriteLine("Start Processing Buffer");
    Task.WhenAll(from n in i select DoUpload(n)).Wait();
    Console.WriteLine("Finished Processing Buffer");
    return Unit.Default;
}).Subscribe();

NB: 这段代码是在 LINQPad 中编写的,并包括 Nuget 包 Rx-Main。这段代码仅用于说明目的 - 不要在生产代码中使用 Thread.Sleep()


2
首先,我认为您要求每个组中的项目并行执行,但每个组依次执行是相当不寻常的。更常见的需求是并行执行项目,但最多同时执行n个项目。这样,就没有固定的组,因此如果单个项目花费太长时间,其他项目不必等待它。
为了实现您的要求,我认为TPL Dataflow比Rx更适合(尽管一些Rx代码仍将有用)。TPL Dataflow以“块”为中心执行任务,默认情况下是依次执行,这正是您需要的。
您的代码可能如下所示:
public static class Extensions
{
    public static Task ExecuteInGroupsAsync<T>(
         this IEnumerable<T> source, Func<T, Task> func, int groupSize)
     {
         var block = new ActionBlock<IEnumerable<T>>(
             g => Task.WhenAll(g.Select(func)));
         source.ToObservable()
               .Buffer(groupSize)
               .Subscribe(block.AsObserver());
         return block.Completion;
     }
}

public Task ProcessFiles(IEnumerable<File> files)
{
    return files.ExecuteInGroupsAsync(Upload, 10);
}

这使得大部分的重活都落在了ActionBlock上(以及一些Rx上)。数据流块可以作为Rx观察者(和可观察者)来使用,因此我们可以利用它来继续使用Buffer()
我们想要一次处理整个组,所以我们使用Task.WhenAll()创建一个任务,当整个组完成时完成。数据流块理解返回Task的函数,所以下一组将在前一组返回的Task完成之后开始执行。
最终结果是Completion Task,在源可观察对象完成并且所有处理完成后完成。
TPL Dataflow还有BatchBlock,它的工作原理类似于Buffer(),我们可以直接从集合中Post()每个项目(而不使用ToObservable()AsObserver()),但我认为在代码的这部分中使用Rx会更简单。
编辑:实际上在这里你根本不需要TPL Dataflow。如James World所建议的那样使用ToEnumerable()就足够了。
public static async Task ExecuteInGroupsAsync<T>(
     this IEnumerable<T> source, Func<T, Task> func, int groupSize)
{
    var groups = source.ToObservable().Buffer(groupSize).ToEnumerable();
    foreach (var g in groups)
    {
        await Task.WhenAll(g.Select(func));
    }
}

甚至可以更简单,不使用Rx,而是使用morelinq中的Batch()

public static async Task ExecuteInGroupsAsync<T>(
    this IEnumerable<T> source, Func<T, Task> func, int groupSize)
{
    var groups = source.Batch(groupSize);
    foreach (var group in groups)
    {
        await Task.WhenAll(group.Select(func));
    }
}

我不确定我同意我所尝试的与 Rx 的推送本质相反。我的理解是 Batch 方法被设计成在处理完成之前不会触发(滑动窗口,如果你愿意的话)。我只是想将这种行为扩展到异步环境中。无论如何,将其改回可枚举对象似乎已经奏效了。谢谢。 - MgSam
@MgSam 这是针对我还是针对詹姆斯说的?我没有提到“推送性质”的任何事情。 - svick
我以为你同意那个推理,因为你提到他的帖子然后建议使用 ToEnumerable。抱歉我误解了。 - MgSam

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接