立即交付第一项,“防抖”后续项目。

47

考虑以下使用情况:

  • 需要尽快交付第一项
  • 需要将以下事件与1秒超时去抖动

我最终实现了基于OperatorDebounceWithTime的自定义操作符,然后像这样使用它

.lift(new CustomOperatorDebounceWithTime<>(1, TimeUnit.SECONDS, Schedulers.computation()))

CustomOperatorDebounceWithTime会立即发送第一个项目,然后使用OperatorDebounceWithTime运算符的逻辑来防抖后续项目。

有没有更简单的方法来实现所描述的行为?让我们跳过compose运算符,因为它无法解决问题。我正在寻找一种在不实现自定义运算符的情况下实现此目的的方法。

15个回答

43

更新:
根据@lopar的评论,更好的方法是:

Observable.from(items).publish(publishedItems -> publishedItems.limit(1).concatWith(publishedItems.skip(1).debounce(1, TimeUnit.SECONDS)))

这个方案是否可行:

String[] items = {"one", "two", "three", "four", "five"};
Observable<String> myObservable = Observable.from(items);
Observable.concat(
  myObservable.first(), 
  myObservable.skip(1).debounce(1, TimeUnit.SECONDS)
).subscribe(s -> 
  System.out.println(s));

6
您可以使用 publish 来避免订阅两次(在冷的可观察情况下会做两倍的工作,在热的情况下可能会失去同步)。示例如下:Observable.from(items).publish(publishedItems -> publishedItems.limit(1).concatWith(publishedItems.skip(1).debounce(1, TimeUnit.SECONDS)))注意,如果 Observable 为空,first 将会出错,因此,除非这是预期行为,否则您可能需要使用 limit(1) - lopar
2
您IP地址为143.198.54.68,由于运营成本限制,当前对于免费用户的使用频率限制为每个IP每72小时10次对话,如需解除限制,请点击左下角设置图标按钮(手机用户先点击左上角菜单按钮)。 - tomrozb
2
@LordRaydenMK,你的原始答案很好,不用担心是初学者! :) 很高兴我能帮助微调一下。 - lopar
3
当使用发布表单时,您不应该使用.skip(1)。因为观察者正在使用相同的订阅,它将从相同的位置继续。它应该只是Observable.from(items).publish(publishedItems -> publishedItems.limit(1).concatWith(publishedItems.debounce(1, TimeUnit.SECONDS))) - TrevorSStone
5
在版本2中,使用.take(1)代替.limit(1)。 - Ajay George
显示剩余2条评论

18

@LortRaydenMK和@lopar的答案是最好的,但是如果您或类似情况的人希望尝试其他方法,我想提出另一种建议。

有一种debounce()的变体,它采用一个函数来确定要针对特定项进行多长时间的去抖动(debounce)。它通过返回在一定时间后完成的observable来指定。您的函数可以为第一个项目返回empty(),然后为其余的返回timer()。类似以下代码(未经测试):

String[] items = {"one", "two", "three", "four", "five", "six"};
Observable.from(items)
    .debounce(item -> item.equals("one")
            ? Observable.empty()
            : Observable.timer(1, TimeUnit.SECONDS));

关键在于这个函数必须要知道哪个是第一个项目。你的序列可能知道,如果不知道,你可能需要使用zip()range()或其他方法。在这种情况下最好使用另一个答案中的解决方案。


应该是 TimeUnit.SECONDS(你漏掉了“S”)。不过这是我找到的唯一使用此函数的例子。 它确实帮助了我。非常感谢! - Warcello

11
使用RxJava 2.0的简单解决方案,翻译自RxJS相同问题的答案,它结合了throttleFirst和debounce,然后去除重复项。
private <T> ObservableTransformer<T, T> debounceImmediate() {
    return observable  -> observable.publish(p -> 
        Observable.merge(p.throttleFirst(1, TimeUnit.SECONDS), 
            p.debounce(1, TimeUnit.SECONDS)).distinctUntilChanged());
} 

@Test
public void testDebounceImmediate() {
    Observable.just(0, 100, 200, 1500, 1600, 1800, 2000, 10000)
        .flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS).map(w -> v))
        .doOnNext(v -> System.out.println(LocalDateTime.now() + " T=" + v))
            .compose(debounceImmediate())
            .blockingSubscribe(v -> System.out.println(LocalDateTime.now() + " Debounced: " + v));
}

使用limit()或take()的方法似乎无法处理长时间存在的数据流,即我可能希望不断观察,但仍然要立即对首次出现的事件做出反应。

8

LordRaydenMK和lopar的回答存在一个问题:第二项总是会丢失。我认为之前没有人发布这个问题是因为如果有消抖,你通常有很多事件,并且第二个事件将在消抖时被丢弃。正确的方法是:

observable
    .publish(published ->
        published
            .limit(1)
            .concatWith(published.debounce(1, TimeUnit.SECONDS)));

不用担心,您不会收到任何重复的事件。如果您不确定,可以运行此代码并自行检查:

Observable.just(1, 2, 3, 4)
    .publish(published ->
        published
            .limit(1)
            .concatWith(published))
    .subscribe(System.out::println);

7
基于@lopar的评论的Kotlin扩展函数:
fun <T> Flowable<T>.debounceImmediate(timeout: Long, unit: TimeUnit): Flowable<T> {
    return publish {
        it.take(1).concatWith(it.debounce(timeout, unit))
    }
}

fun <T> Observable<T>.debounceImmediate(timeout: Long, unit: TimeUnit): Observable<T> {
    return publish {
        it.take(1).concatWith(it.debounce(timeout, unit))
    }
}

7
使用接受函数参数的debounce版本,并按照以下方式实现函数:
    .debounce(new Func1<String, Observable<String>>() {
        private AtomicBoolean isFirstEmission = new AtomicBoolean(true);
        @Override
        public Observable<String> call(String s) {
             // note: standard debounce causes the first item to be
             // delayed by 1 second unnecessarily, this is a workaround
             if (isFirstEmission.getAndSet(false)) {
                 return Observable.just(s);
             } else {
                 return Observable.just(s).delay(1, TimeUnit.SECONDS);
             }
        }
    })

第一个项目立即发出。后续项目将延迟一秒钟。如果延迟的可观察对象在下一个项目到达之前没有终止,它将被取消,因此实现了预期的去抖动行为。

2

Ngrx - rxjs解决方案,将管道分为两个部分

onMyAction$ = this.actions$
    .pipe(ofType<any>(ActionTypes.MY_ACTION);

lastTime = new Date();

@Effect()
onMyActionWithAbort$ = this.onMyAction$
    .pipe(
        filter((data) => { 
          const result = new Date() - this.lastTime > 200; 
          this.lastTime = new Date(); 
          return result; 
        }),
        switchMap(this.DoTheJob.bind(this))
    );

@Effect()
onMyActionWithDebounce$ = this.onMyAction$
    .pipe(
        debounceTime(200),
        filter(this.preventDuplicateFilter.bind(this)),
        switchMap(this.DoTheJob.bind(this))
    );

2
阅读完这篇文章之后,我最终使用了throttleLatest操作符,以获得与我所寻求的即时去抖动非常相似的行为。

throttleLatest marble diagram

以下代码将立即发出第一个项目,然后每500毫秒检查新项目。只有在该500毫秒窗口内收到的最新事件将被发送出去。
observable.throttleLatest(500, TimeUnit.MILLISECONDS)

2

我的Dart解决方案:

extension StreamExt<T> on Stream<T> {
  Stream<T> immediateDebounce(Duration duration) {
    var lastEmit = 0;
    return debounce((event) {
      if (_now - lastEmit < duration.inMilliseconds) {
        lastEmit = _now;
        return Stream.value(event).delay(duration);
      } else {
        lastEmit = _now;
        return Stream.value(event);
      }
    });
  }
}

int get _now =>  DateTime.now().millisecondsSinceEpoch;

1

如果有人在2021年寻找这个:

@OptIn(FlowPreview::class)
fun <T> Flow<T>.debounceImmediate(timeMillis: Long): Flow<T> =
    withIndex()
        .onEach { if (it.index != 0) delay(timeMillis) }
        .map { it.value }

使用方法:

authRepository.login(loginDto)
                    .debounceImmediate(10000)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接