合并运算符Concat和Merge的区别

59

我正在查看RXJava的文档,发现concat和merge操作符似乎做相同的事情。我写了一些测试来确认。

@Test
public void testContact() {

    Observable.concat(Observable.just("Hello"),
                      Observable.just("reactive"),
                      Observable.just("world"))
              .subscribe(System.out::println);
}

@Test
public void testMerge() {

    Observable.merge(Observable.just("Hello"),
                      Observable.just("reactive"),
                      Observable.just("world"))
            .subscribe(System.out::println);
}

文档说:

Merge操作符也是类似的。它结合了两个或多个Observables的发射,但是可能会交错它们,而Concat从不交错来自多个Observables的发射。

但我仍然不完全理解,执行此测试数千次后,合并结果始终相同。由于顺序不能保证,例如有时我期望是"reactive""world""hello"。

代码在这里:https://github.com/politrons/reactive

3个回答

154

正如您引用的文档所述 - 合并可以交错输出,而连接将在处理后续流之前首先等待较早的流完成。 在您的情况下,由于只有单个元素和静态流,这不会产生任何实质性的区别(但理论上,合并可以以随机顺序输出单词,仍然符合规范)。如果您想看到差异,请尝试以下操作(之后需要添加一些休眠时间以避免过早退出)

    Observable.merge(
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "A" + id),
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "B" + id))
    .subscribe(System.out::println);

A0 B0 A1 B1 B2 A2 B3 A3 B4 A4

对比

    Observable.concat(
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "A" + id),
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "B" + id))
    .subscribe(System.out::println);

A0 A1 A2 A3 A4 A5 A6 A7 A8

由于流A从未完成,因此Concat永远不会开始打印B。

s/stream/observable/g ;)

文档提供了漂亮的图表来展示它们之间的区别。需要记住的是合并操作并不能保证交错每个项, 它只是可能性之一。

Concat

Concat操作符 Merge

Merge操作符


37
请注意,如果来源是同步的,则 merge = concat - Dave Moten
好观点!这解释了为什么我的示例总是按相同的顺序进行。那就是让我感到困惑的部分。由于Dave的示例是interval(async),因此它是可重现的。 - paul
2
@ArturBiesiadowski,您能否在您的回答中添加“zip”? - Jared Burrows
@DaveMotenby 举个例子,如果我的一个数据源在网络上,另一个在数据库中,那么它们都是异步的,对吧? - pratham kesarkar
有人能解释一下什么是“同步源”吗? - Abhishek Kumar
1
@AbhishekKumar 同步意味着源代码在完成之前不会放弃控制权。 - Julian A.

8

合并操作(Concat)

合并操作(Concat)将两个或多个Observables的发射物按顺序进行合并,不会交错发射。这意味着它将先发射第一个Observable的所有发射物,然后再依次发射第二个Observable的所有发射物,以此类推。

合并操作符

下面通过一个例子来更清晰地理解。

final String[] listFirst = {"A1", "A2", "A3", "A4"};
final String[] listSecond = {"B1", "B2", "B3"};

final Observable<String> observableFirst = Observable.fromArray(listFirst);
final Observable<String> observableSecond = Observable.fromArray(listSecond);

Observable.concat(observableFirst, observableSecond)
        .subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

由于使用了连接运算符,所以它会保持顺序并按照A1,A2,A3,A4,B1,B2,B3的顺序发出值。

合并

合并通过合并它们的发射将多个Observables合并为一个。在发出项时,它不会保持顺序。

Merge Operator

让我们通过一个例子来清楚地理解它。

final String[] listFirst = {"A1", "A2", "A3", "A4"};
final String[] listSecond = {"B1", "B2", "B3"};

final Observable<String> observableFirst = Observable.fromArray(listFirst);
final Observable<String> observableSecond = Observable.fromArray(listSecond);

Observable.merge(observableFirst, observableSecond)
        .subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

由于我们使用了Merge操作符,它不会保持顺序并且可以以任何顺序发出值,例如A1、B1、A2、A3、B2、B3、A4A1、A2、B1、B2、A3、A4、B3或其他任何顺序。

这就是在RxJava中应根据用例使用Concat和Merge操作符的方式。


2
如果两个源是同步的,正如OP所说,它们通过mergeconcat返回完全相同的顺序。
但是,当源不同步时,例如数据库和套接字等两个源以随机或不确定的延迟推送数据时,mergeconcat的数据顺序是不同的。
fun <T> Observable<T>.getAsyncItems(): Observable<T> {

    return concatMap {

        val delay = Random().nextInt(10)

        Observable.just(it)
                .delay(delay.toLong(), TimeUnit.MILLISECONDS)
    }
}

这个扩展函数会随机地给每个发射添加延迟,但它们总是按相同的顺序发射。如果数据是A、B、C、D和E,则输出始终为A、B、C、D和E。
现在让我们比较一下当源是异步的时,merge和concat的行为。
val source1 =
        Observable.just("A", "B", "C", "D", "E").getAsyncItems()
val source2 =
        Observable.just(1, 2, 3, 4, 5).getAsyncItems()

Observable.merge(source1, source2).subscribe {print("$it-")}

打印类似于 A-B-1-C-2-D-3-4-E-5-A-1-2-3-B-C-D-4-E-5-,这取决于随机延迟。

Observable.concat(source1, source2).subscribe {print("$it-")}

无论运行多少次,都会打印出A-B-C-D-E-1-2-3-4-5-

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接