我正在使用响应式扩展(Rx)来缓冲一些数据。但是我遇到了一个问题,我需要对这些数据进行异步操作,但在异步操作完成之前,我不希望缓冲区将下一组数据通过。
我尝试过两种方式来结构化代码(假设的例子):
我尝试过两种方式来结构化代码(假设的例子):
public async Task processFiles<File>(IEnumerable<File> files)
{
await files.ToObservable()
.Buffer(10)
.SelectMany(fi => fi.Select(f => upload(f)) //Now have an IObservable<Task>
.Select(t => t.ToObservable())
.Merge()
.LastAsync();
}
public Task upload(File item)
{
return Task.Run(() => { //Stuff });
}
或者
public async Task processFiles<File>(IEnumerable<File> files)
{
var buffered = files.ToObservable()
.Buffer(10);
buffered.Subscribe(async files => await Task.WhenAll(files.Select(f => upload(f)));
await buffered.LastAsync();
}
public Task upload(File item)
{
return Task.Run(() => { //Stuff });
}
不幸的是,这两种方法都不起作用,因为缓冲区在异步操作完成之前就推送了下一个组。意图是让每个缓冲组以异步方式执行,并且只有当该操作完成后,才继续处理下一个缓冲组。
非常感谢您的帮助。
upload()
。而且Task.WhenAll()
无法在File
集合上工作。 - svick