对于旧版本的RxJs,我编写了一个名为concatLatest
的操作符,它可以完成大部分你所需的操作。使用它,您可以通过以下代码获得您所需的节流行为:
const delay = Rx.Observable.empty().delay(500);
inputObservable
.map(value => Rx.Observable.of(value).concat(delay))
.concatLatest()
.subscribe(...);
这里是操作员。我尝试更新它以与RxJS5一起使用:
Rx.Observable.prototype.concatLatest = function () {
var source = this;
return Rx.Observable.create(function (observer) {
var latest,
isStopped,
isBusy,
outerSubscription,
innerSubscription,
subscriptions = new Rx.Subscription(function () {
if (outerSubscription) {
outerSubscription.unsubscribe();
}
if (innerSubscription) {
innerSubscription.unsubscribe();
}
}),
onError = observer.error.bind(observer),
onNext = observer.next.bind(observer),
innerOnComplete = function () {
var inner = latest;
if (inner) {
latest = undefined;
if (innerSubscription) {
innerSubscription.unsubscribe();
}
innerSubscription = inner.subscribe(onNext, onError, innerOnComplete);
}
else {
isBusy = false;
if (isStopped) {
observer.complete();
}
}
};
outerSubscription = source.subscribe(function (newInner) {
if (isBusy) {
latest = newInner;
}
else {
isBusy = true;
if (innerSubscription) {
innerSubscription.unsubscribe();
}
innerSubscription = newInner.subscribe(onNext, onError, innerOnComplete);
}
}, onError, function () {
isStopped = true;
if (!isBusy) {
observer.complete();
}
});
return subscriptions;
});
};
这是一个更新后的Plunkr:
https://plnkr.co/edit/DSVmSPRijJwj9msefjRi?p=preview
请注意,我已将您的lodash版本更新为最新版本。在lodash 4.7中,我重写了throttle/debounce操作符以解决一些边缘情况下的错误。您使用的是4.6.1版,该版本仍存在某些错误,尽管我不认为它们会影响您的测试。
throttle(() => Promise.resolve(...), { leading: true, trailing:true })
- 我发现这对于立即运行ajax放置并等待完成,然后在第一个完成时运行最终的放置系列特别有用。 - Josh Mc