使用响应式扩展分割序列

3

关于 RX,我有一个小问题。我有一个从键盘输入的符号流,我需要将它们分成组。当流中出现 ';' 符号时,应该开始新的一组。简单来说,我需要一个操作符,类似于 Buffer,但它会在某个条件为真时触发,而不是经过一段时间延迟或事件计数后触发。有没有办法使用RX中已经存在的操作符构建它,或者我应该自己编写?

3个回答

3

这是一个源码。

var source = new[] { 'a', 'b', ';', 'c', 'd', 'e', ';' }.ToObservable();

这是您要求的内容:

以下是您所需的信息:

var groups = source
    // Group the elements by some constant (0)
    // and end the group when we see a semicolon
    .GroupByUntil(x => 0, group => group.Where(x => x == ';'))

以下是使用方法:

groups
    // Log that we're on the next group now.
    .Do(x => Console.WriteLine("Group: "))
    // Merge / Concat all the groups together
    // {{a..b..;}..{c..d..e..;}} => {a..b..;..c..d..e..;}
    .Merge()
    // Ignore the semicolons? This is optional, I suppose.
    .Where(x => x != ';')
    // Log the characters!
    .Do(x => Console.WriteLine("  {0}", x))
    // Make it so, Number One!
    .Subscribe();

输出:

Group:
  a
  b
Group:
  c
  d
  e

GroupByUntill会创建一个新的序列,该序列将在应用程序的生命周期内保持活动状态,我有什么遗漏吗? - L.E.O
@L.E.O,No。GroupBy将在外部订阅的生命周期内保留对每个新GroupedObservable的引用。GroupByUntil将为每个GroupedObservable执行相同的操作,或者直到该GroupedObservable被“until”选择器终止。对于此实现,只会保留一个GroupedObservable,因为我们正在按常量分组。 - cwharris
实际上,这与Nikolai的答案完成的事情完全相同,只是它只使用了一个运算符和一个订阅。 - cwharris
这有点说得通,尽管个人而言,我觉得缓冲运算符可能更适合作为这个操作符。 - L.E.O
实际上,缓冲操作符应该有一个“直到”重载,将每个缓冲作为参数传递。这里的“按常量分组”逻辑确实有点奇怪,但在我看来提供了最好的结果。 - cwharris

2
我们可以使用Buffer重写,其中边界可观测对象是我们的初始流,仅过滤出分号条目。
//this is our incoming stream
IObservable<char> keyboardStream;

//if the observable is cold we need to do this 
//in case of it being hot (as I would expect a keyboard stream to be) we do not need to do it
var persistedStream = keyboardStream.Publish().RefCount();

var bufferedBySemicolon = persistedStream.Buffer(persistedStream .Where(c=>c==';'));

这似乎很简单而且有趣。 - L.E.O
1
我倾向于不同意使用 RefCount 的大部分情况,除非不需要同步。当涉及同步时,最好明确连接,特别是在处理热流时。 - cwharris
具体来说,如果您的keyboardStream在不同于bufferedBySemicolon订阅的线程上被观察/生成,则在此处使用RefCount是不确定性的,因此会出现竞争条件。 - cwharris
感谢@Christopher Harris提醒我关于竞态条件问题(我并不是很理解)。尽管如此,与“GroupByUntil”答案相比,这个答案确实非常简短和简单,似乎使用了过度的运算符。如果观察者是热的,并且我们没有使用“.Publish().RefCount();”,是否仍然存在竞态条件,或者这个答案就是好的? - Stéphane Gourichon

2
这是 Nikolai 回答的非 RefCount 版本。这提供了更明确的订阅和处理同步,并且应该消除在观察源与您的消费者订阅的不同线程上时发生的竞争条件(当您处理 UI 时通常会出现这种情况)。
var groups = Observable.Create(o => {

    var publishedSource = source.Publish();

    return new CompositeDisposable(
        publishedSource.Buffer(publishedSource.Where(c => c == ';')).Subscribe(o),
        publishedSource.Connect()
        );

});

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接