为什么这些Rx序列是无序的?

5
假设我写下以下内容:
var gen = Observable.Range(1, 3)
  .SelectMany(x => Observable.Range(1, x));

生成的序列是 1 1 2 1 2 3,与预期相符。但现在如果我写下以下代码:
var gen = Observable.Range(1, 4)
  .SelectMany(x => Observable.Range(1, x));

现在生成的序列是1 1 2 1 2 1 3 2 3 4,而不是我们期望的1 1 2 1 2 3 1 2 3 4。这是为什么?SelectMany()是否进行了某种多线程合并?

2
也许你应该使用不同的调度程序:Observable.Range(1, 4, ImmediateScheduler.Instance).SelectMany(x => Observable.Range(1, x, ImmediateScheduler.Instance)) 按顺序创建序列。 - Nico
2
SelectMany本身不涉及调度。默认情况下,.Range使用CurrentThreadScheduler,该调度程序会排队操作,但不是多线程的。通常,在序列中,Rx不会按顺序进行,除非您通过操作使它们按顺序进行 - 这正是它们可观察而不是可枚举的原因。 - Jeroen Mostert
那么 Observable.Range() 不能保证按顺序生成事件吗? - Dmitri Nesteruk
@JeroenMostert 不,Rx中的序列肯定是有序的,除非你做了一些使它们无序的事情。可观察对象和可枚举对象之间的区别在于推送与拉取。可枚举对象在客户端请求时从源获取数据,而可观察对象在准备好数据时向客户端推送数据。 - Brandon Kramer
1
@BrandonKramer:单独的序列是有顺序的。但一旦你开始组合不同的序列,就需要更加关注正在发生的事情——虽然有顺序,但它可能并不直观,甚至不确定(如果涉及到线程调度器)。这就是我的意思。 - Jeroen Mostert
@JeroenMostert 我明白你的意思了,我太过于字面理解你所说的话了。在这种澄清下,我同意你的观点。 - Brandon Kramer
1个回答

8

Observable.Range() 本身总是按顺序生成其事件。然而,SelectMany() 在开始下一个 observable 之前不会等待前一个 observable 完成。这意味着随着序列变得越来越长,重叠的部分也会越来越多,因为下一个序列将在前一个序列完成之前启动。

如果您想让输出按顺序进行,那么您需要使用不同的序列展开方法,例如Concat()

例如:

var gen = Observable.Range(1, 4)
  .Select(x => Observable.Range(1, x)).Concat();

输出结果: 1,1,2,1,2,3,1,2,3,4

SelectMany()不同,Concat()会等待每个序列完成后再开始下一个序列。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接