RxJS 节流行为;立即获取第一个值

21

示例 Plunkr: https://plnkr.co/edit/NZwb3ol8CbZFtSc6Q9zm?p=preview

我知道 RxJS(5.0 beta.4)有这三种节流方法:

auditTime()throttleTime()debounceTime()

我要寻找的行为与 lodash 在默认情况下执行的 throttle 相同:

    1. 立即将第一个值给我!
    1. 在连续的值上,保持给定延迟的值,然后发出最后发生的值。
    1. 当节流延迟过期时,回到状态 (1)

理论上应该看起来像:

inputObservable
  .do(() => cancelPreviousRequest())
  .throttleTime(500)
  .subscribe((value) => doNextRequest(value))

但是

  • throttleTime如果在节流超时期间被发出,不会给我最后一个值
  • debounceTime不会立即触发
  • auditTime不会立即触发

我能否组合任何RxJS方法来实现所描述的行为?

3个回答

30

对于在2018年之后寻找此功能的任何人:此功能已经添加了一年多,但由于某种原因,文档尚未更新。

RxJS提交

您可以将配置对象直接传递给throttleTime。默认值为{ leading: true, trailing: false }。要实现此处讨论的行为,只需将trailing设置为true{ leading: true, trailing: true }

编辑:

为了完整起见,这里是一个有效的片段:

import { asyncScheduler } from 'rxjs'
import { throttleTime } from 'rxjs/operators'

...

observable.pipe(
  throttleTime(100, asyncScheduler, { leading: true, trailing: true })
)

3
如果您想基于除时间以外的其他条件来实现节流(例如ajax完成),则需要注意:您可以使用 throttle(() => Promise.resolve(...), { leading: true, trailing:true }) - 我发现这对于立即运行ajax放置并等待完成,然后在第一个完成时运行最终的放置系列特别有用。 - Josh Mc
这是否意味着auditTime已经过时了? - Pieter De Bie
请注意,这种方法的潜在缺点是您现在会发出每个时间间隔的第一个和最新的。 - Pinna_be

3

对于旧版本的RxJs,我编写了一个名为concatLatest的操作符,它可以完成大部分你所需的操作。使用它,您可以通过以下代码获得您所需的节流行为:

const delay = Rx.Observable.empty().delay(500);
inputObservable
    .map(value => Rx.Observable.of(value).concat(delay))
    .concatLatest()
    .subscribe(...);

这里是操作员。我尝试更新它以与RxJS5一起使用:

Rx.Observable.prototype.concatLatest = function () {
    /// <summary>
    /// Concatenates an observable sequence of observable sequences, skipping sequences that arrive while the current sequence is being observed.
    /// If N new observables arrive while the current observable is being observed, the first N-1 new observables will be thrown
    /// away and only the Nth will be observed.
    /// </summary>
    /// <returns type="Rx.Observable"></returns>
    var source = this;

    return Rx.Observable.create(function (observer) {
        var latest,
            isStopped,
            isBusy,
            outerSubscription,
            innerSubscription,
            subscriptions = new Rx.Subscription(function () {
              if (outerSubscription) {
                outerSubscription.unsubscribe();
              }
              if (innerSubscription) {
                innerSubscription.unsubscribe();
              }
            }),
            onError = observer.error.bind(observer),
            onNext = observer.next.bind(observer),
            innerOnComplete = function () {
                var inner = latest;
                if (inner) {
                    latest = undefined;
                    if (innerSubscription) {
                      innerSubscription.unsubscribe();
                    }
                    innerSubscription = inner.subscribe(onNext, onError, innerOnComplete);
                }
                else {
                    isBusy = false;
                    if (isStopped) {
                        observer.complete();
                    }
                }
            };

        outerSubscription = source.subscribe(function (newInner) {
            if (isBusy) {
                latest = newInner;
            }
            else {
                isBusy = true;
                if (innerSubscription) {
                  innerSubscription.unsubscribe();
                }
                innerSubscription = newInner.subscribe(onNext, onError, innerOnComplete);
            }
        }, onError, function () {
            isStopped = true;
            if (!isBusy) {
                observer.complete();
            }
        });

        return subscriptions;
    });
};

这是一个更新后的Plunkr: https://plnkr.co/edit/DSVmSPRijJwj9msefjRi?p=preview 请注意,我已将您的lodash版本更新为最新版本。在lodash 4.7中,我重写了throttle/debounce操作符以解决一些边缘情况下的错误。您使用的是4.6.1版,该版本仍存在某些错误,尽管我不认为它们会影响您的测试。

嗨@Brandon,非常好的答案。你能提供一个使用RxJS 4.x的concatLatest操作符的plunkr链接吗?(我本不会问,因为问题是特定于RxJS 5的,但你说你已经准备好了 :)) - HipsterZipster
@HipsterZipster,我为 RxJS 2 写了这个版本。 我认为它也适用于 4 版本:https://gist.github.com/bman654/92749cd93cdd84a540e41403f25a2105 - Brandon

2
我使用auditTime运算符并更改了其中的两行代码以实现所需的行为。

新的plunker: https://plnkr.co/edit/4NkXsOeJOSrLUP9WEtp0?p=preview

原始链接:

更改内容:

从(auditTime):

protected _next(value: T): void {
  this.value = value;
  this.hasValue = true;
  if (!this.throttled) {
    this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this));
  }
}

clearThrottle(): void {
  const { value, hasValue, throttled } = this;
  if (throttled) {
    this.remove(throttled);
    this.throttled = null;
    throttled.unsubscribe();
  }
  if (hasValue) {
    this.value = null;
    this.hasValue = false;
    this.destination.next(value);
  }
}

to (auditTimeImmediate):

protected _next(value: T): void {
    this.value = value;
    this.hasValue = true;
    if (!this.throttled) {
        // change 1:
        this.clearThrottle();
    }
}

clearThrottle(): void {
    const { value, hasValue, throttled } = this;
    if (throttled) {
        this.remove(throttled);
        this.throttled = null;
        throttled.unsubscribe();
    }
    if (hasValue) {
        this.value = null;
        this.hasValue = false;
        this.destination.next(value);
        // change 2:
        this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this));
    }
}

我在next操作后开始计时。

用法:

inputObservable
  .do(() => cancelPreviousRequest())
  .auditTimeImmediate(500)
  .subscribe((value) => doNextRequest(value))

2
有没有一种方法可以在最新版本中实现这一点,而不需要更改rxjs源代码? - Andreas Gassmann

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接