Spring Reactor Merge与Concat的区别

11

我正在使用Spring Reactor,但是我无法看到concatmerge操作符之间的区别。

这里是我的示例:

    @Test
    public void merge() {
        Flux<String> flux1 = Flux.just("hello").doOnNext(value -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Flux<String> flux2 = Flux.just("reactive").doOnNext(value -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Flux<String> flux3 = Flux.just("world");
        Flux.merge(flux1, flux2, flux3)
                .map(String::toUpperCase)
                .subscribe(System.out::println);
    }

    @Test
    public void concat() {
        Flux<String> flux1 = Flux.just("hello").doOnNext(value -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Flux<String> flux2 = Flux.just("reactive").doOnNext(value -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Flux<String> flux3 = Flux.just("world");
        Flux.concat(flux1, flux2, flux3)
                .map(String::toUpperCase)
                .subscribe(System.out::println);    
}

两者表现完全相同。有人能够解释一下这两个操作之间的区别吗?
4个回答

17

合并和连接的本质区别在于,在合并中,两个流都是活动的。在连接的情况下,第一个流被终止,然后将另一个流连接到它后面。

Concat enter image description here


Merge enter image description here


但是合并时发射的顺序为什么总是相同的呢? - paul
什么是序列? - Amriteya
Flux 1、2和3一直存在。 - paul
还有时间差吗? - Amriteya
不,从来没有,这就是我提问的原因。 - paul

13

API文档中已经提到了这种差异,即concat首先完全读取一个流,然后将第二个流附加到该流中,merge运算符不能保证两个流之间的顺序。

为了看到区别,请按以下方式修改您的合并(merge)()代码。

例如,下面是示例代码:

//Flux with Delay
@Test
public void merge() {
    Flux<String> flux1 = Flux.just("Hello", "Vikram");

    flux1 = Flux.interval(Duration.ofMillis(3000))
    .zipWith(flux1, (i, msg) -> msg);
    
    
    Flux<String> flux2 = Flux.just("reactive");
    flux2 = Flux.interval(Duration.ofMillis(2000))
            .zipWith(flux2, (i, msg) -> msg);
    
    Flux<String> flux3 = Flux.just("world");
    Flux.merge(flux1, flux2, flux3)
            .subscribe(System.out::println);

    try {
        Thread.sleep(8000);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

当您修改Flux.interval持续时间(目前设置为3000毫秒)时,您会发现使用merge()的输出会不断变化。但是使用concat(),输出将始终保持不变。


嗨@Vikram Rawat,您的示例输出是reactive,world,Hello,Vikram。如果没有任何睡眠,为什么不首先显示“world”这个单词? - AleGallagher
嗨@AleGallagher,发现得好..我已经编辑了代码..如果你运行它,输出将是world、reactive、Hello、Vikram。之前的代码在doOnNext中调用了一个sleep,这会让订阅者进入睡眠状态,而不是延迟从flux 2中发出"reactive"。所以,我改变了flux2,现在延迟了数据("reactive")的发射。希望这有意义。 - Vikram Rawat

3

另一个值得注意的区别是,所有的concat变体都在第一个流终止后才延迟订阅第二个流。

而所有merge变体都会急切地订阅发布方(所有发布方一起被订阅)

运行以下代码可以突出显示这个方面:

//Lazy subscription of conact
Flux.concat(Flux.just(1, 2, 3, 4).delayElements(Duration.ofMillis(500)),
                Flux.just(10, 20, 30, 40).delayElements(Duration.ofMillis(500)))
                .subscribe(System.out::println, System.out::println);



//Eager subscription of the merge. Also, try mergeSequential.
Flux.merge(Flux.range(500, 3).delayElements(Duration.ofMillis(500)),
                Flux.range(-500, 3).delayElements(Duration.ofMillis(300)))
                .subscribe(System.out::println, System.out::println);

0
1st example:  Flux<String> stringFlux= Flux.just("a","b").delayElements(Duration.ofMillis(200));
        Flux<String> stringFlux1= Flux.just("c","d").delayElements(Duration.ofMillis(300));
        Flux<String> stringFlux2=Flux.merge(stringFlux,stringFlux1);
                stringFlux2.subscribe(System.out::println);
                Thread.sleep(4000);
output: a
        c
        b
        d
2nd example: Flux<String> stringFlux= Flux.just("a","b");
        Flux<String> stringFlux1= Flux.just("c","d");
        Flux<String> stringFlux2=Flux.merge(stringFlux,stringFlux1);
                stringFlux2.subscribe(System.out::println);
                Thread.sleep(4000);
output: a a c
        c b a
        b c d 
        d d b
conclusion: if you dont give delay to the elements then publishers are subscribed eagerly and output would be random as in 2nd example case.
If you specify delay to the elements then in which publisher you provide less amount of time will subscribed first and emit element and then other publisher emit elements as in 1st example.
It totally depends upon time for example output for below would be :
c,d,a,b because there is big diff between 2 seconds and 300 milliseconds. But in 1st example there is less diff between milliseconds so output would be in **interleaving fashion**.
3rd example :
Flux<String> stringFlux= Flux.just("a","b").delayElements(Duration.ofSeconds(2));
Flux<String> stringFlux1= Flux.just("c","d").delayElements(Duration.ofMillis(300));

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接