RxJava对发出的列表中每个项目进行延迟

74

我在使用Rx时遇到了一些困难,我本以为应该很简单。

我有一个项目列表,我想要每个项目都被延迟后再发出。

似乎Rx delay() 操作符仅仅是将所有项目的发射时间推迟了指定的延迟时间,而没有针对每个单独的项目进行操作。

下面是一些测试代码。它将项目分组在一个列表中。然后每个组应该在发出之前都被延迟。

Observable.range(1, 5)
    .groupBy(n -> n % 5)
    .flatMap(g -> g.toList())
    .delay(50, TimeUnit.MILLISECONDS)
    .doOnNext(item -> {
        System.out.println(System.currentTimeMillis() - timeNow);
        System.out.println(item);
        System.out.println(" ");
    }).toList().toBlocking().first();

结果为:

154ms
[5]

155ms
[2]

155ms
[1]

155ms
[3]

155ms
[4]

但我希望看到的是这样的:

174ms
[5]

230ms
[2]

285ms
[1]

345ms
[3]

399ms
[4]

我做错了什么?


不知道为什么没有任何答案真正回答了这个问题。为什么它不起作用,出了什么问题? - eis
8
每个关于 Rx 的问题似乎都以“我正在努力实现我认为应该很简单的东西”作为开头。 :) - Steven Jeuris
17个回答

76

最简单的方法似乎就是使用 concatMap,并将每个项目包装在一个延迟的 Observable 中。

long startTime = System.currentTimeMillis();
Observable.range(1, 5)
        .concatMap(i-> Observable.just(i).delay(50, TimeUnit.MILLISECONDS))
        .doOnNext(i-> System.out.println(
                "Item: " + i + ", Time: " + (System.currentTimeMillis() - startTime) +"ms"))
        .toCompletable().await();

打印:

Item: 1, Time: 51ms
Item: 2, Time: 101ms
Item: 3, Time: 151ms
Item: 4, Time: 202ms
Item: 5, Time: 252ms

我更喜欢这种方法,因为我毫不怀疑我应该关闭某些东西。使用zip+interval,我认为我需要手动停止Interval的发射。 - MercurieVV
即使上游速度缓慢,也能完美运行。 - Stéphane Appercel
当我们知道需要多少结果时,这似乎可以很好地完成。但是,当我们不知道有多少结果时,有没有办法做到这一点? - Chris Sobolewski
concatMap 步骤适用于任何类型的 observable,有界或无界、大小固定或不固定。所给出的示例仅为了匹配问题和演示的便利而设定了大小。 - Magnus
toCompletable已被弃用。 - Johann

67

一种方法是使用zip将你的可观测对象与Interval可观测对象组合起来,以延迟输出。

Observable.zip(Observable.range(1, 5)
        .groupBy(n -> n % 5)
        .flatMap(g -> g.toList()),
    Observable.interval(50, TimeUnit.MILLISECONDS),
    (obs, timer) -> obs)
    .doOnNext(item -> {
      System.out.println(System.currentTimeMillis() - timeNow);
      System.out.println(item);
      System.out.println(" ");
    }).toList().toBlocking().first();

2
谢谢!我认为延迟运算符不是我想要使用的方式。这个解决方案有效 :) - athor
15
如果您的源可观察对象发射速度慢于50毫秒,那么这将行不通。Observable.interval()将每50毫秒发射一个项目。如果来自分组列表的项目没有匹配项,zip()运算符将对它们进行缓冲。然后,当组被发射时,zip将立即将其与间隔可观察对象的一个项目组合起来,无延迟地发送到您的doOnNext() - kjones
@kjones同意 - 这对于生产速度慢的情况不起作用,只能延迟那些同时可用的项目。但最初的问题是 - “我有一个项目列表,并且我希望每个项目都在延迟后发出。”这听起来像是使用from并在每次发射之间设置延迟。 - iagreen
1
@iagreen - 你说得对。我只是假设Observable.range(1,5)是一个虚拟的源observable。由于我所处理的几乎所有内容都可以在任意随机时间产生项目,因此我倾向于将这个要求投射到其他人身上。 - kjones
如果您不想包装初始的可观察对象,您可以像Mina Samy的答案中一样内联使用zipWith:https://dev59.com/E1wX5IYBdhLWcg3w2iou#41718758 - tir38
显示剩余3条评论

49

分享一种简单的方法,可以按间隔逐个发出集合中的每个项:

Observable.just(1,2,3,4,5)
    .zipWith(Observable.interval(500, TimeUnit.MILLISECONDS), (item, interval) -> item)
    .subscribe(System.out::println);

每个项目将会每500毫秒发出一次


23

针对Kotlin用户,我编写了一个“间隔压缩”方法的扩展函数

import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import java.util.concurrent.TimeUnit

fun <T> Observable<T>.delayEach(interval: Long, timeUnit: TimeUnit): Observable<T> =
    Observable.zip(
        this, 
        Observable.interval(interval, timeUnit), 
        BiFunction { item, _ -> item }
    )

它的工作方式相同,但这使其可重复使用。例如:

Observable.range(1, 5)
    .delayEach(1, TimeUnit.SECONDS)

6
我觉得这正是你所需要的。看一下:
long startTime = System.currentTimeMillis();
Observable.intervalRange(1, 5, 0, 50, TimeUnit.MILLISECONDS)
                .timestamp(TimeUnit.MILLISECONDS)
                .subscribe(emitTime -> {
                    System.out.println(emitTime.time() - startTime);
                });

5
为每个发出的项目引入延迟是有用的:
List<String> letters = new ArrayList<>(Arrays.asList("a", "b", "c", "d"));

Observable.fromIterable(letters)
                .concatMap(item -> Observable.interval(1, TimeUnit.SECONDS)
                        .take(1)
                        .map(second -> item))
                .subscribe(System.out::println);

https://github.com/ReactiveX/RxJava/issues/3505中有更多好的选项。


3

您可以实现一个自定义的rx运算符,例如MinRegularIntervalDelayOperator,然后使用lift函数来应用它。

Observable.range(1, 5)
    .groupBy(n -> n % 5)
    .flatMap(g -> g.toList())
    .lift(new MinRegularIntervalDelayOperator<Integer>(50L))
    .doOnNext(item -> {
      System.out.println(System.currentTimeMillis() - timeNow);
      System.out.println(item);
      System.out.println(" ");
    }).toList().toBlocking().first();

1

你可以使用

   Observable.interval(1, TimeUnit.SECONDS)
            .map(new Function<Long, Integer>() {
                @Override
                public Integer apply(Long aLong) throws Exception {
                    return aLong.intValue() + 1;
                }
            })
            .startWith(0)
            .take(listInput.size())
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer index) throws Exception {
                    Log.d(TAG, "---index of your list --" + index);
                }
            });

上述代码不会重复值(索引)。“我确定”


1

有另外一种方法可以使用concatMap,因为concatMap会返回源项目的可观察对象。所以我们可以在该可观察对象上添加延迟。

这是我尝试过的方法。

Observable.range(1, 5)
          .groupBy(n -> n % 5)
          .concatMap(integerIntegerGroupedObservable ->
          integerIntegerGroupedObservable.delay(2000, TimeUnit.MILLISECONDS))
          .doOnNext(item -> {
                    System.out.println(System.currentTimeMillis() - timeNow);
                    System.out.println(item);
                    System.out.println(" ");
                }).toList().toBlocking().first(); 

1
一种不太优雅的方法是使用 .delay(Func1) 操作符使延迟随着迭代而改变。
Observable.range(1, 5)
            .delay(n -> n*50)
            .groupBy(n -> n % 5)
            .flatMap(g -> g.toList())
            .doOnNext(item -> {
                System.out.println(System.currentTimeMillis() - timeNow);
                System.out.println(item);
                System.out.println(" ");
            }).toList().toBlocking().first();

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接