这似乎需要使用微软的响应式框架。
我从以下代码作为我的初始变量开始:
var items = Enumerable.Range(0, 10).ToArray();
Func<int, bool> Predicate = x => x
Func<int, int> ComputeSomeValue = x =>
{
Thread.Sleep(10000);
return x * 3;
};
现在,我使用常规的LINQ查询作为基线:
var results =
from x in items
where Predicate(x)
select ComputeSomeValue(x);
这个计算耗费了50秒来得出以下结果:
![可枚举](https://istack.dev59.com/lQan7.webp)
然后我转而使用了一个observable(响应式框架)查询:
var results =
from x in items.ToObservable()
where Predicate(x)
from y in Observable.Start(() => ComputeSomeValue(x))
select y;
获取这个结果需要10秒钟:
![observable](https://istack.dev59.com/qMXAp.webp)
很明显是在并行计算。
然而,结果是无序的。因此我将查询更改为:
var query =
from x in items.ToObservable()
where Predicate(x)
from y in Observable.Start(() => ComputeSomeValue(x))
select new { x, y };
var results =
query
.ToEnumerable()
.OrderBy(z => z.x)
.Select(z => z.y);
这样仍然需要10秒钟,但是我按照正确的顺序得到了结果。
现在,唯一的问题在于WithDegreeOfParallelism
。这里有几个尝试的方法。
首先,我将代码更改为生成10,000个值,每个值的计算时间为10毫秒。我的标准LINQ查询仍然需要50秒。但是响应式查询只需要6.3秒。如果它可以同时执行所有计算,那么所需时间应该更少。这表明它正在达到异步管道的最大值。
第二点是,响应式框架使用调度程序来进行所有工作调度。您可以尝试使用各种与响应式框架一起提供的调度程序来查找替代方案,如果内置的调度程序不能满足您的要求,则甚至可以编写自己的调度程序来执行任何调度操作。
以下是一个并行计算谓词的查询版本:
var results =
from x in items.ToObservable()
from p in Observable.Start(() => Predicate(x))
where p
from y in Observable.Start(() => ComputeSomeValue(x))
select new { x, y };