Reactive Extensions中的BufferWithTime方法会重叠调用OnNext吗?

4
我正在使用Rx的BufferWithTime()方法批量处理消息。如果我的OnNext方法处理时间超过了指定的时间间隔,是否会导致两个并发的OnNext方法被调用?
换句话说,BufferWithTime()方法中指定的时间间隔是绝对时间还是在OnNext方法返回后作为“异步休眠”执行?
1个回答

4
不会的,对OnNext的调用将会被排队。下面的代码在缓冲时间到期时写入跟踪信息,并且在向订阅者调用OnNext的开始/结束时(其中它会睡眠4倍于缓冲时间)也会写入跟踪信息:
Observable.Interval(TimeSpan.FromMilliseconds(50), Scheduler.TaskPool)
    .Take(50)
    .BufferWithTime(TimeSpan.FromMilliseconds(100))
    .Do(_ => Trace.WriteLine("Buffer elapsed"))
    .ObserveOn(Scheduler.TaskPool)
    .Subscribe(_ =>
    {
        Trace.WriteLine("Begin OnNext");
        Thread.Sleep(200);
        Trace.WriteLine("End OnNext");
    });

输出如下。您可以看到,即使在一个OnNext调用期间发生了两次Buffer elapsed,Begin/End OnNext也不会重叠:

缓冲区已过期
开始OnNext
缓冲区已过期
缓冲区已过期
结束OnNext
开始OnNext
缓冲区已过期
缓冲区已过期
缓冲区已过期
结束OnNext
开始OnNext
结束OnNext
开始OnNext
结束OnNext
开始OnNext
结束OnNext
开始OnNext
结束OnNext


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接