我正在使用Rx的BufferWithTime()方法批量处理消息。如果我的OnNext方法处理时间超过了指定的时间间隔,是否会导致两个并发的OnNext方法被调用?
换句话说,BufferWithTime()方法中指定的时间间隔是绝对时间还是在OnNext方法返回后作为“异步休眠”执行?
换句话说,BufferWithTime()方法中指定的时间间隔是绝对时间还是在OnNext方法返回后作为“异步休眠”执行?
OnNext
的调用将会被排队。下面的代码在缓冲时间到期时写入跟踪信息,并且在向订阅者调用OnNext
的开始/结束时(其中它会睡眠4倍于缓冲时间)也会写入跟踪信息:Observable.Interval(TimeSpan.FromMilliseconds(50), Scheduler.TaskPool)
.Take(50)
.BufferWithTime(TimeSpan.FromMilliseconds(100))
.Do(_ => Trace.WriteLine("Buffer elapsed"))
.ObserveOn(Scheduler.TaskPool)
.Subscribe(_ =>
{
Trace.WriteLine("Begin OnNext");
Thread.Sleep(200);
Trace.WriteLine("End OnNext");
});
OnNext
调用期间发生了两次Buffer elapsed
,Begin/End OnNext也不会重叠:
缓冲区已过期
开始OnNext
缓冲区已过期
缓冲区已过期
结束OnNext
开始OnNext
缓冲区已过期
缓冲区已过期
缓冲区已过期
结束OnNext
开始OnNext
结束OnNext
开始OnNext
结束OnNext
开始OnNext
结束OnNext
开始OnNext
结束OnNext