Spring Boot Webflux/Netty - 检测关闭的连接

14

我一直在使用spring-boot 2.0.0.RC1和webflux starter (spring-boot-starter-webflux)。我创建了一个简单的控制器,返回一个无限流。我希望只有在有客户端(订阅者)时,发布者才执行其工作。假设我有一个像下面这样的控制器:

@RestController
public class Demo {

    @GetMapping(value = "/")
    public Flux<String> getEvents(){
        return Flux.create((FluxSink<String> sink) -> {

            while(!sink.isCancelled()){

                // TODO e.g. fetch data from somewhere

                sink.next("DATA");
            }
            sink.complete();
        }).doFinally(signal -> System.out.println("END"));
    }

}

现在,当我尝试运行该代码并使用Chrome访问端点http://localhost:8080/时,我可以看到数据。但是,一旦关闭浏览器,while循环会继续运行,因为没有取消事件被触发。如何在关闭浏览器后立即终止/取消流?

从这个答案中我引用:

目前使用HTTP时,确切的背压信息不会通过网络传输,因为HTTP协议不支持此操作。如果我们使用不同的线路协议,则可能会更改此设置。

我认为,由于HTTP协议不支持背压,这意味着也不会发出取消请求。

进一步调查,通过分析网络流量,显示浏览器在关闭后立即发送TCP FIN。是否有一种方法可以配置Netty(或其他内容),以便半关闭的连接将触发发布者上的取消事件,使while循环停止?

还是我必须编写类似于org.springframework.http.server.reactive.ServletHttpHandlerAdapter的自己的适配器,在其中实现自己的订阅者?

感谢任何帮助。

编辑: 如果没有客户端,则在尝试向套接字写数据时会引发IOException。如你可以在堆栈跟踪中看到的那样。

但是这还不够好,因为在下一个数据块准备好发送之前可能需要一段时间,因此要检测已离开的客户端需要相同的时间。正如Brian Clozel在答案中指出的那样,这是Reactor Netty中已知的问题。我尝试通过将依赖项添加到POM.xml来改用Tomcat。像这样:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
虽然它替换了Netty并使用Tomcat,但由于浏览器未显示任何数据,因此它似乎不是响应式的。 但是,控制台中没有警告/信息/异常。截至此版本(2.0.0.RC1),spring-boot-starter-webflux是否应该与Tomcat一起使用?
2个回答

16

由于这是一个已知的问题(参见Brian Clozel的回答),我最终使用了一个Flux来获取我的真实数据,并使用另一个Flux来实现某种ping/心跳机制。结果,我使用Flux.merge()将两者合并在一起。

在这里,您可以看到我的解决方案的简化版本:

@RestController
public class Demo {

    public interface Notification{}

    public static class MyData implements Notification{
        …
        public boolean isEmpty(){…}
    }

    @GetMapping(value = "/", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<? extends Notification>> getNotificationStream() {
        return Flux.merge(getEventMessageStream(), getHeartbeatStream());
    }

    private Flux<ServerSentEvent<Notification>> getHeartbeatStream() {
        return Flux.interval(Duration.ofSeconds(2))
                .map(i -> ServerSentEvent.<Notification>builder().event("ping").build())
                .doFinally(signalType ->System.out.println("END"));
    }

    private Flux<ServerSentEvent<MyData>> getEventMessageStream() {
        return Flux.interval(Duration.ofSeconds(30))
                .map(i -> {

                    // TODO e.g. fetch data from somewhere,
                    // if there is no data return an empty object

                    return data;
                })
                .filter(data -> !data.isEmpty())
                .map(data -> ServerSentEvent
                        .builder(data)
                        .event("message").build());
    }
}

我用ServerSentEvent<? extends Notification>来包装所有的内容,其中Notification只是一个标记接口。我使用ServerSentEvent类中的event字段来区分数据和ping事件。由于心跳Flux会以短时间间隔不断发送事件,因此检测客户端是否离线的时间最长为该时间间隔的长度。记住,我需要这个功能是因为在我获得可以发送的真实数据之前可能需要一段时间,因此也可能需要一段时间才能检测到客户端已经离线。这样一来,它将尽快检测到客户端已经离线,因为它无法发送ping(或可能是消息事件)。

关于标记接口Notification的最后一点说明。这并不是真正必要的,但它提供了一些类型安全性。如果没有它,我们可以将Flux<ServerSentEvent<?>>作为getNotificationStream()方法的返回类型,而不是Flux<ServerSentEvent<? extends Notification>>。另外,也可以使getHeartbeatStream()返回Flux<ServerSentEvent<MyData>>。然而,这样做将允许发送任何对象,这是我不想看到的。因此,我添加了这个接口。


1
我没有完全理解它的工作方式。一旦客户端断开连接,会因为异常销毁带有ping的流吗? - Volodymyr Masliy
我已经通过在客户端断开WiFi的方式进行了测试,但它没有起作用。然而,当我通过关闭浏览器或浏览器的选项卡进行测试时,它是可以正常工作的。那么,如何使其在断开客户端WiFi或互联网时也能正常工作呢? - Spring

3

我不确定为什么会出现这种情况,但我怀疑是因为选择了错误的生成操作符。我认为使用以下内容可以解决问题:

    return Flux.interval(Duration.ofMillis(500))
    .map(input -> {
        return "DATA";
    });

根据Reactor的参考文档,你可能正在遇到generatepush之间的关键区别(我相信使用generate的类似方法也可能有效)。

我的评论是指背压信息(一个Subscriber愿意接受多少个元素),但成功/错误信息通过网络进行通信。

根据您选择的Web服务器(Reactor Netty、Tomcat、Jetty等),关闭客户端连接可能会导致:

  • 在服务器端收到取消信号(我认为Netty支持这一点)
  • 当服务器在尝试写入已关闭的连接时收到错误信号(我认为Servlet规范没有提供该回调并且我们缺少取消信息)。

简而言之:你不需要做任何特殊处理,它应该已经被支持了,但是你的Flux实现可能是真正的问题所在。

更新:这是 Reactor Netty 中已知的问题


1
谢谢您的回答。实际上,当我按照您建议的方式实现Flux时,情况有所改善,这意味着它会在尝试发送下一个数据时立即检测到浏览器已关闭。但是,在我的用例中,可能需要一段时间才能产生新数据。因此,它也可能需要一段时间才能检测到客户端已离开。作为解决方法,我尝试了以下内容: return Flux.interval(Duration.ofMillis(500)) .map(input -> { return "DATA || null"; }).filter(x -> x != null); - Michael Stadler
但是当我过滤掉那些空值时,它直到尝试发送真实数据(例如非空值)才会检测到连接已关闭。 - Michael Stadler

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接