Spring Web-Flux中的反压机制

58

我是一个Spring Web-Flux的初学者。我编写了一个控制器,如下所示:

@RestController
public class FirstController 
{
    @GetMapping("/first")
    public Mono<String> getAllTweets() 
    {
        return Mono.just("I am First Mono")
    }
}

我知道响应式编程的其中一个好处是背压(Backpressure),它可以平衡请求或响应速率。我想了解如何在Spring Web-Flux中实现背压机制。


如何实现反压或如何处理它?您已经内置了处理反压的机制,所以您想了解它们吗? - Andrew Tobilko
如果是这样,请查看此页面 - Andrew Tobilko
我之前不知道它已经默认存在了,所以了解如何处理可能更好。我想了解Spring Web-Flux中的机制及其配置。 - Sam
2
@Andrew Tobilko RxJava与Reactor不同,后者是Spring WebFlux的核心。 - gstackoverflow
1个回答

112

WebFlux中的反压力

为了理解WebFlux框架当前实现中的反压力是如何工作的,我们必须回顾默认使用的传输层。正如我们所知道的那样,浏览器和服务器之间的普通通信(服务器到服务器的通信通常也是一样的)是通过TCP连接完成的。WebFlux也使用该传输方式进行客户端和服务器之间的通信。 那么,为了理解“反压控制”术语的含义,我们必须从响应式流规范的角度回顾什么是反压。

基本语义定义了如何通过反压力调节流元素的传输。

因此,从这个声明中,我们可以得出结论,在响应式流中,反压是一种机制,通过传输(通知)接收方可以消耗多少元素来调节需求;在这里有一个棘手的问题。TCP具有字节抽象而不是逻辑元素抽象。我们通常想要通过反压力控制发送/接收到/从网络接收的逻辑元素数量。尽管TCP有其自己的流控制(请参见此处的含义和那里的动画),但这种流控制仍然是针对字节而不是逻辑元素的。

在WebFlux模块的当前实现中,反压由传输流控制调节,但它不会公开接收方的真正需求。为了最终看到交互流,请参见以下图表:

enter image description here

为了简单起见,上面的图表展示了两个微服务之间的通信,其中左侧的微服务发送数据流,右侧的微服务消费该流。以下编号列表提供了该图表的简要说明:
1. 这是WebFlux框架,它负责将逻辑元素转换为字节,并将它们传输/接收到/从TCP(网络)中。 2. 这是长时间运行的处理开始,一旦作业完成,请求下一个元素。 3. 在这里,当业务逻辑没有需求时,WebFlux会将来自网络的字节排队,而不需要它们的确认(业务逻辑没有需求)。 4. 由于TCP流量控制的性质,服务A仍然可能向网络发送数据。
如上图所示,接收方暴露的需求与发送方的需求(逻辑元素中的需求)不同。这意味着两者的需求是隔离的,只适用于WebFlux <->业务逻辑(服务)交互,并且将Service A<->Service B交互的背压曝光得更少。所有这些意味着WebFlux中的背压控制不如我们所期望的那样公平。
但我仍然想知道如何控制背压。
如果我们仍然想在WebFlux中进行不公平的背压控制,我们可以利用Project Reactor运算符(例如limitRate())的支持来实现。以下示例显示了如何使用该运算符:
@PostMapping("/tweets")
public Mono<Void> postAllTweets(Flux<Tweet> tweetsFlux) {
    
    return tweetService.process(tweetsFlux.limitRate(10))
                       .then();
}

从这个例子中可以看出,limitRate()操作符允许定义一次预取的元素数量。这意味着即使最终的订阅者请求Long.MAX_VALUE个元素,limitRate操作符也会将该需求拆分成块,并且不允许一次性消耗超过该数量的元素。我们也可以对元素发送过程执行相同的操作:

@GetMapping("/tweets")
public Flux<Tweet> getAllTweets() {
    
    return tweetService.retreiveAll()
                       .limitRate(10);
}

上面的示例显示,即使WebFlux一次请求了超过10个元素,limitRate()也会将需求限制为预取大小,并防止一次性消耗超过指定数量的元素。
另一个选项是实现自己的Subscriber或扩展Project Reactor中的BaseSubscriber。例如,以下是我们如何实现的一个简单示例:
class MyCustomBackpressureSubscriber<T> extends BaseSubscriber<T> {

    int consumed;
    final int limit = 5;

    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        request(limit);
    }
    
    @Override
    protected void hookOnNext(T value) {
        // do business logic there 

        consumed++;
        
        if (consumed == limit) {
            consumed = 0;
            
            request(limit);
        }
    }
}

RSocket协议实现公平的背压

为了通过网络边界实现逻辑元素的背压,我们需要一个适当的协议。幸运的是,有一个叫做RSocket协议的协议。RSocket是一种应用层协议,允许通过网络边界传输真正的需求。

有一个RSocket-Java协议的实现,可以设置一个RSocket服务器。在服务器对服务器通信的情况下,同样的RSocket-Java库也提供了客户端实现。要了解如何使用RSocket-Java,请参见以下示例这里

对于浏览器和服务器之间的通信,有一个RSocket-JS实现,它允许通过WebSocket连接浏览器和服务器之间的流式通信。

基于RSocket的已知框架

现在有几个基于RSocket协议构建的框架。

Proteus

其中一个框架是Proteus项目,它提供了基于RSocket的完整微服务。此外,Proteus与Spring框架集成良好,因此现在我们可以实现公平的背压控制(请参见这里的示例)。

进一步阅读


1
尽管答案非常好,但我认为OP正在询问处理背压的编程方法(更高级别、API相关)。 - Andrew Tobilko
让我添加它。稍等一下。 - Oleh Dokuka
@OlehDokuka 感谢您的好解释。您能告诉我扩展 BaseSubscriber 是一种常见的方式,还是 Spring Web-Flux 的默认行为更好? - Sam
最好使用框架控制。但是,您可以使用limitRate进行一些限制,仅在需要细粒度控制的情况下才可以扩展“BaseSubscriber”。在许多情况下,这取决于您真正想要实现的目标。我建议您在Reactor gitter聊天室中进行对话并进行正常聊天-> https://gitter.im/reactor/reactor - Oleh Dokuka
@gstackoverflow图表上的确认消息是什么意思?为什么只有服务A才确认服务B?请参阅TCP流量控制机制-> https://www.brianstorti.com/tcp-flow-control/ - Oleh Dokuka
显示剩余14条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接