WebFlux中的反压力
为了理解WebFlux框架当前实现中的反压力是如何工作的,我们必须回顾默认使用的传输层。正如我们所知道的那样,浏览器和服务器之间的普通通信(服务器到服务器的通信通常也是一样的)是通过TCP连接完成的。WebFlux也使用该传输方式进行客户端和服务器之间的通信。
那么,为了理解“反压控制”术语的含义,我们必须从响应式流规范的角度回顾什么是反压。
基本语义定义了如何通过反压力调节流元素的传输。
因此,从这个声明中,我们可以得出结论,在响应式流中,反压是一种机制,通过传输(通知)接收方可以消耗多少元素来调节需求;在这里有一个棘手的问题。TCP具有字节抽象而不是逻辑元素抽象。我们通常想要通过反压力控制发送/接收到/从网络接收的逻辑元素数量。尽管TCP有其自己的流控制(请参见此处的含义和那里的动画),但这种流控制仍然是针对字节而不是逻辑元素的。
在WebFlux模块的当前实现中,反压由传输流控制调节,但它不会公开接收方的真正需求。为了最终看到交互流,请参见以下图表:
为了简单起见,上面的图表展示了两个微服务之间的通信,其中左侧的微服务发送数据流,右侧的微服务消费该流。以下编号列表提供了该图表的简要说明:
1. 这是WebFlux框架,它负责将逻辑元素转换为字节,并将它们传输/接收到/从TCP(网络)中。
2. 这是长时间运行的处理开始,一旦作业完成,请求下一个元素。
3. 在这里,当业务逻辑没有需求时,WebFlux会将来自网络的字节排队,而不需要它们的确认(业务逻辑没有需求)。
4. 由于TCP流量控制的性质,服务A仍然可能向网络发送数据。
如上图所示,接收方暴露的需求与发送方的需求(逻辑元素中的需求)不同。这意味着两者的需求是隔离的,只适用于WebFlux <->业务逻辑(服务)交互,并且将Service A<->Service B交互的背压曝光得更少。所有这些意味着WebFlux中的背压控制不如我们所期望的那样公平。
但我仍然想知道如何控制背压。
如果我们仍然想在WebFlux中进行不公平的背压控制,我们可以利用Project Reactor运算符(例如
limitRate()
)的支持来实现。以下示例显示了如何使用该运算符:
@PostMapping("/tweets")
public Mono<Void> postAllTweets(Flux<Tweet> tweetsFlux) {
return tweetService.process(tweetsFlux.limitRate(10))
.then();
}
从这个例子中可以看出,limitRate()
操作符允许定义一次预取的元素数量。这意味着即使最终的订阅者请求Long.MAX_VALUE
个元素,limitRate
操作符也会将该需求拆分成块,并且不允许一次性消耗超过该数量的元素。我们也可以对元素发送过程执行相同的操作:
@GetMapping("/tweets")
public Flux<Tweet> getAllTweets() {
return tweetService.retreiveAll()
.limitRate(10);
}
上面的示例显示,即使WebFlux一次请求了超过10个元素,
limitRate()
也会将需求限制为预取大小,并防止一次性消耗超过指定数量的元素。
另一个选项是实现自己的
Subscriber
或扩展Project Reactor中的
BaseSubscriber
。例如,以下是我们如何实现的一个简单示例:
class MyCustomBackpressureSubscriber<T> extends BaseSubscriber<T> {
int consumed;
final int limit = 5;
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(limit);
}
@Override
protected void hookOnNext(T value) {
consumed++;
if (consumed == limit) {
consumed = 0;
request(limit);
}
}
}
RSocket协议实现公平的背压
为了通过网络边界实现逻辑元素的背压,我们需要一个适当的协议。幸运的是,有一个叫做RSocket协议的协议。RSocket是一种应用层协议,允许通过网络边界传输真正的需求。
有一个RSocket-Java协议的实现,可以设置一个RSocket服务器。在服务器对服务器通信的情况下,同样的RSocket-Java库也提供了客户端实现。要了解如何使用RSocket-Java,请参见以下示例这里。
对于浏览器和服务器之间的通信,有一个RSocket-JS实现,它允许通过WebSocket连接浏览器和服务器之间的流式通信。
基于RSocket的已知框架
现在有几个基于RSocket协议构建的框架。
Proteus
其中一个框架是Proteus项目,它提供了基于RSocket的完整微服务。此外,Proteus与Spring框架集成良好,因此现在我们可以实现公平的背压控制(请参见这里的示例)。
进一步阅读