无限Java Stream和Reactor Flux有何区别?

15

我想了解无限流(infinite Stream)和无限Flux(infinite Flux)之间的概念差异(如果有的话)。

为此,我列举了下面这些无限流/Flux的示例:

@Test
public void infinteStream() {

  //Prints infinite number of integers
  Stream<Integer> infiniteStream = Stream.iterate(0, i -> i+1);
 
  infiniteStream.forEach(System.out::println);
}

@Test
public void infiniteFlux()  {
    
   //Prints infinite number of date strings (every second)
   Flux<LocalDateTime> localDateTimeFlux = Flux.interval(Duration.ofSeconds(1))
            .map(t -> LocalDateTime.now());

    localDateTimeFlux.subscribe(t -> System.out.println(t.format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss"))));
}

关于这些例子,我有一个问题:是否有类似于Flux的infinteStream()(以及类似于Stream的infinteFlux())的模拟?并且,更一般地说,无限流和Flux之间是否有任何区别?

2个回答

29

StreamFlux 是非常不同的:

  • Stream 是单次使用,而你可以多次订阅 Flux
  • Stream 是拉取式的(消费一个元素调用下一个) vs. Flux 采用混合推/拉模型,其中发布者可以推送元素,但仍必须遵守由消费者发出的背压信号
  • Stream 是同步序列,而 Flux 可以表示异步序列

例如,您可以使用 Stream 生成无限值序列,它们尽可能快地生产和消费。在您的 Flux 示例中,您正在固定间隔时间内生成值(我不确定您是否可以使用 Stream 这样做)。使用 Flux,您还可以像您的 Stream 示例一样生成不带间隔的序列,通过使用Flux.generate方法。

总的来说,您可以将 Flux 视为 Stream + CompletableFuture 的混合体,具有:

  • 很多强大的操作符
  • 背压支持
  • 控制发布者和订阅者行为
  • 控制时间概念(缓冲窗口值,添加超时和回退等)
  • 专门针对通过网络获取的异步序列的功能(从数据库或远程 Web API)

亲爱的布莱恩,感谢你阐述这些差异!我会尝试使用Flux.generate来实现你的建议 :-) - Felix
3
在什么情况下应该使用流(Streams)而不是Flux? - user1955934

2

参考一下,同时我想出了一个能解决infiniteFlux()的Stream-Solution:

@Test 
public void infiniteFluxWithStream()  {

    Stream<Integer> infiniteStream = Stream.iterate(0, i -> i+1).peek(x->{
    LocalDateTime t = LocalDateTime.now();
    t.format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss"));
    System.out.println(t);
    });

    infiniteStream.forEach(x->{
    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    }); 

这确实很丑陋。然而,它表明在(非常)原则上,可以使用流的术语重写简单的Flux示例。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接