TPL数据流和Rx结合的示例

14

我只想学习它们以及如何将它们结合使用。我知道它们可以互相补充,但我找不到有人真正做到这一点的例子。


2
SO最适合针对特定问题的解决方案。"给我一个X的例子"并不是很适合这种情况。 - svick
@naeron84,看一个结合了C#异步和Rx的示例是否足够? - GregC
@GregC 实际上我想使用f#,但是async与TPL Dataflow块没有直接关系。 - naeron84
@naeron84,那我能否在你的问题上添加F#标签呢? - GregC
1
@svick 关于并行性和异步性的问题。如果我将它们混合在一起,会发生什么之类的事情。我开始觉得这个问题太琐碎或太难或毫无意义,我可能没有掌握非常基础的东西 :) - naeron84
显示剩余6条评论
1个回答

21
让我先介绍一下背景。.NET框架有许多特殊类型——无论是适当的类还是接口——Task<T>IObservable<T>Nullable<T>IEnumerable<T>Lazy<T>等,它们为底层类型T提供特殊能力。
TPL使用Task<T>来表示单个值T的异步计算。
Rx使用IObservable<T>来表示零个或多个值T的异步计算。
正是这两者的“异步计算”方面将TPL和Rx联系在一起。
现在,TPL还使用Task类型来表示异步执行Action lambda,但这可以视为Task<T>的特殊情况,其中Tvoid。就像C#中的标准方法返回void一样:
public void MyMethod() { }

Rx还可以使用一种称为Unit的特殊类型来处理同样的特殊情况。

TPL和Rx之间的区别在于返回值的数量。TPL只有一个,而Rx是零个或多个。

因此,如果您通过仅使用返回单个值的可观察序列来特别处理Rx,则可以以类似于TPL的方式进行一些计算。

例如,在TPL中,我可以编写:

Task.Factory
    .StartNew(() => "Hello")
    .ContinueWith(t => Console.WriteLine(t.Result));

在Rx中,相当于这个的是:
Observable
    .Start(() => "Hello")
    .Subscribe(x => Console.WriteLine(x));

我可以通过指定TPL来执行计算,从而在Rx中更进一步:

Observable
    .Start(() => "Hello", Scheduler.TaskPool)
    .Subscribe(x => Console.WriteLine(x));

默认情况下,线程池被使用。

现在我可以进行一些“混搭”。如果我添加一个对 System.Reactive.Threading.Tasks 命名空间的引用,我就可以轻松地在任务和可观察对象之间切换。

Task.Factory
    .StartNew(() => "Hello")
    .ToObservable()
    .Subscribe(x => Console.WriteLine(x));

Observable
    .Start(() => "Hello")
    .ToTask()
    .ContinueWith(t => Console.WriteLine(t.Result));

注意ToObservable().ToTask()的调用以及从一个库到另一个库的翻转。
如果我有一个返回多个值的可观察对象,我可以使用可观察对象的.ToArray()扩展方法将多个序列值转换为单个数组值,然后将其转换为任务。就像这样:
Observable
    .Interval(TimeSpan.FromSeconds(1.0))
    .Take(5) // is IObservable<long>
    .ToArray()
    .ToTask() // is Task<long[]>
    .ContinueWith(t => Console.WriteLine(t.Result.Length));

我认为这是对你问题的一个相当基础的回答。这是否符合你的期望?


21
TPL Dataflow是与TPL截然不同的库,因此我认为答案并没有准确回答问题。然而,这次讨论非常值得注意,所以点赞(+1)。 - GregC
很遗憾,.Net类型系统实际上不允许 Task<void> - svick
2
抱歉,但正如GregC之前所述,我需要一个涉及TPL Dataflow而不是“仅仅”TPL的示例。我想要的是将TPL Dataflow块与Rx结合使用。 - naeron84
抱歉 - 我不知道 DataFlow。很抱歉没有回答你的问题。 - Enigmativity
1
@svick - 缺少 Task<void>IObservable<Unit> 所要表示的。你不能使用 Task<Unit> 来代替 Task<void> 吗? - Enigmativity
大多数情况下是这样的,只是不太好。例如,您仍然需要在每个返回Unit的函数末尾明确地return Unit.Default; - svick

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接