PLINQ是否有异步版本?

15

我希望能够在处理项目时以一定的并行度对数据流执行查询。通常情况下,我会使用 PLINQ 实现这一点,但我的工作项不是 CPU 绑定,而是 IO 绑定,我想要使用异步 IO。PLINQ 不支持异步工作。

有什么聪明的方法可以运行类似于 PLINQ 的查询,但具有异步工作项?


以下是问题的更详细说明:

我的目标是以逻辑上描述如下查询的方式处理潜在无限的“项目”流:

var items = new int[10]; //simulate data

var results =
 from x in items.AsParallel().WithDegreeOfParallelism(100)
 where Predicate(x)
 select ComputeSomeValue(x);

foreach (var result in results)
 PerformSomeAction(result);

这个查询只是真实查询的草图。现在我希望每个占位函数都是异步的(返回一个Task,并在内部基于异步 IO)。

请注意,可能会有比内存中存储的更多的项目。我还必须控制并行度以最大化底层网络和磁盘硬件的利用率。

这个问题与多核无关。它完全适用于只有一个 CPU 核心的机器,因为 IO 仍然可以从并行性中受益。考虑慢速 Web 服务调用等情况。


2
+1,好问题。我想知道你是否可以利用IO端口完成来实现并行处理?免责声明:我在C++中经常使用它们,但从未在C#中使用过。 - Moo-Juice
2
你说过"工作项不是CPU绑定,而是IO绑定"。因此,大量的核心和CPU并行性并不能提供太多帮助。我的意思是,如果CPU使用率低而I/O使用率高,针对这两个操作创建n = 10个链接动作(ComputeSomeValue,然后是PerformSomeAction),并按顺序启动这些链。*新任务(ComputeSomeValue).ContinueWith(...)*等等。 - turdus-merula
你可以使用PLINQ(CPU并行)来应用你已经有的过滤器,但仅限于此...对于I/O部分,你必须启动任务...依我之见。 - turdus-merula
你看过 TPL Dataflow 吗?显然比原始的 PLINQ 更冗长,但似乎正是你所需要的。如果你厉害的话,可以创建一组 LINQ 绑定,以便可以用 LINQ 语法来表达它。 - lobsterism
1
@Noseratio 是的。我希望这个问题能够提供一些普遍的见解,并适用于所有具有类似“管道”结构的问题。 - usr
显示剩余3条评论
2个回答

6
这似乎需要使用微软的响应式框架。
我从以下代码作为我的初始变量开始:
var items = Enumerable.Range(0, 10).ToArray();

Func<int, bool> Predicate = x => x % 2 == 0;

Func<int, int> ComputeSomeValue = x =>
{
    Thread.Sleep(10000);
    return x * 3;
};

现在,我使用常规的LINQ查询作为基线:
var results =
    from x in items
    where Predicate(x)
    select ComputeSomeValue(x);

这个计算耗费了50秒来得出以下结果:

可枚举

然后我转而使用了一个observable(响应式框架)查询:

var results =
    from x in items.ToObservable()
    where Predicate(x)
    from y in Observable.Start(() => ComputeSomeValue(x))
    select y;

获取这个结果需要10秒钟:

observable

很明显是在并行计算。

然而,结果是无序的。因此我将查询更改为:

var query =
    from x in items.ToObservable()
    where Predicate(x)
    from y in Observable.Start(() => ComputeSomeValue(x))
    select new { x, y };

var results =
    query
        .ToEnumerable()
        .OrderBy(z => z.x)
        .Select(z => z.y);

这样仍然需要10秒钟,但是我按照正确的顺序得到了结果。

现在,唯一的问题在于WithDegreeOfParallelism。这里有几个尝试的方法。

首先,我将代码更改为生成10,000个值,每个值的计算时间为10毫秒。我的标准LINQ查询仍然需要50秒。但是响应式查询只需要6.3秒。如果它可以同时执行所有计算,那么所需时间应该更少。这表明它正在达到异步管道的最大值。

第二点是,响应式框架使用调度程序来进行所有工作调度。您可以尝试使用各种与响应式框架一起提供的调度程序来查找替代方案,如果内置的调度程序不能满足您的要求,则甚至可以编写自己的调度程序来执行任何调度操作。


以下是一个并行计算谓词的查询版本:

var results =
    from x in items.ToObservable()
    from p in Observable.Start(() => Predicate(x))
    where p
    from y in Observable.Start(() => ComputeSomeValue(x))
    select new { x, y };

我以前没有使用过Rx。谓词和ComputeSomeValue是否可以完全异步化(返回一个Task)?在您的示例中,这将是Task.Delay。 - usr
你可以使用响应式框架将 observables 转换为 tasks,也可以将 task 转换为 observables。但是根据我的经验,响应式框架生成的代码比 TPL 更加简洁。我建议你完全在响应式框架中进行计算。 - Enigmativity
@usr - 你也可以将对Predicate的调用变成可观察的调用,使其在并行运行。反应式框架很好地混合了查询所有部分的调度,因此执行效率很高。 - Enigmativity

1

此处所述,PLINQ 用于在多核/多处理器系统上并行运行 LINQ 查询。它与拥有许多磁盘单元和超级网络功能的酷系统没有太大关系。据我所知,它是为在更多的内核上运行可执行代码而不是同时向操作系统发送多个 I/O 请求。

也许您的 Predicate(x) 是 CPU 绑定的,因此您可以使用 PLINQ 执行过滤操作。但是您不能以相同的方式应用对 I/O 要求高的操作(ComputeSomeValuePerformSomeAction)。

您可以为每个项目定义一系列操作(在您的情况下为两个),并将该链(顺序执行?)分派给该链(请参见 延续任务)。

另外,您提到了一个"无限流的物品"。如果这些物品也由I/O生成,则可能听起来有点像生产者-消费者问题

也许您的问题并不太适合多核处理...它可能只是对I/O的需求较高而已...


2
CPU负载不重要。但我需要异步IO和非常高的并行度来最大化IO。该问题完全适用于只有一个CPU核心的机器,因为IO完成将被多路复用到一个核心上。我可以定义一个“操作链”,但是有太多的项目需要同时启动所有工作。我需要一种受限/保证的并行级别。 - usr
@usr,这听起来就像是“TPL Dataflow”。 - i3arnon

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接