如何在Rx中交替缓存和流式传输实时数据流

10

我有两个流。一个是数据流(可以是任何类型),另一个是作为门的布尔流。我需要将它们合并成一个具有以下行为的流:

  • 当门打开时(最近的值为true),数据应该直接流过
  • 当门关闭时(最近的值为false),数据应该被缓冲,以便在下次门打开时释放为单个元素
  • 解决方案应保留所有数据元素和顺序

我不确定如何组合这些流。这是我一直在测试的输入:

// a demo data stream that emits every second
var dataStream = Observable.Interval(TimeSpan.FromSeconds(1));

// a demo flag stream that toggles every 5 seconds
var toggle = false;
var gateStream = Observable.Interval(TimeSpan.FromSeconds(5))
    .Select(_ => toggle = !toggle);
1个回答

17
我会按照以下方式进行操作:
  • 使用门控流作为关闭选择器对数据流进行窗口化
  • 我们可以在门控流上使用DistinctUntilChanged来确保没有重复值
  • 我们还将强制门控流开始关闭(false)-它不会影响输出并允许一个巧妙的技巧
  • 然后使用带有索引号的Select重载函数。通过这个方法,我们可以确定是否需要缓冲或者直接发送窗口,因为我们知道偶数编号的窗口是用于缓冲的(因为我们确保了门控流从false开始)
  • 我们可以使用ToList()来缓冲每个偶数窗口,直到它关闭 - 这实际上等同于等待OnCompletedBuffer()
  • 我们使用自身的SelectMany来平坦化缓冲窗口
  • 最后,我们连接所有窗口以保证顺序被保留

代码如下:

dataStream.Window(gateStream.StartWith(false).DistinctUntilChanged())
          .Select((w, i) =>  i % 2 == 0 ? w.ToList().SelectMany(x => x) : w)
          .Concat()
          .Subscribe(Console.WriteLine);

3
我看到的最酷的处方已经很久了。 - Lee Campbell

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接