通过SSE订阅的Flux会触发cancel()事件。

3

我有一个使用了Thymeleaf Reactive的Spring Boot 2.0.0.M7 + Spring Webflux应用程序。

当我调用返回SSE模式(text/event-stream)下数据流的端点时,我注意到在我的微服务中,即使该流已正确处理,也会发生cancel()。

例如,这是一个简单的控制器端点:

@GetMapping(value = "/posts")
public Flux<String> getCommunityPosts() {
    return Flux.just("A", "B", "C").log("POSTS");
}

这是我在SSE模式下请求时收到的已订阅流日志:

2018-02-13 17:04:09.841  INFO 4281 --- [nio-9090-exec-4] POSTS : | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
2018-02-13 17:04:09.841  INFO 4281 --- [nio-9090-exec-4] POSTS : | request(1)
2018-02-13 17:04:09.842  INFO 4281 --- [nio-9090-exec-4] POSTS : | onNext(A)
2018-02-13 17:04:09.847  INFO 4281 --- [nio-9090-exec-4] POSTS : | request(1)
2018-02-13 17:04:09.847  INFO 4281 --- [nio-9090-exec-4] POSTS : | onNext(B)
2018-02-13 17:04:09.848  INFO 4281 --- [nio-9090-exec-4] POSTS : | request(1)
2018-02-13 17:04:09.848  INFO 4281 --- [nio-9090-exec-4] POSTS : | onNext(C)
2018-02-13 17:04:09.849  INFO 4281 --- [nio-9090-exec-4] POSTS : | request(1)
2018-02-13 17:04:09.849  INFO 4281 --- [nio-9090-exec-4] POSTS : | onComplete()
2018-02-13 17:04:09.852  INFO 4281 --- [nio-9090-exec-4] POSTS : | cancel()

在onComplete之后我们可以注意到取消事件。当我通过经典的GET请求调用相同的端点时,我没有这种行为。我怀疑这个取消事件会导致客户端事件源(JavaScript)抛出一个onError事件。这是SSE特有的已知/期望行为吗?
问题更新:
我有时需要我的事件源获取JSON数据而不是由Thymeleaf处理的HTML,因此我在一些流中使用SSE。我应该用另一种方式来实现吗?
我基于此示例的最后一种方法进行了实现: https://github.com/danielfernandez/reactive-matchday/blob/master/src/main/java/com/github/danielfernandez/matchday/web/controller/MatchController.java 但是,我可能错过了在我的上一篇帖子中提供一些信息。我使用Tomcat Server(8.5.23与M7),而不是Netty服务器。我强制Tomcat使用以下Maven依赖项:
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>

在一个示例项目中使用您的代码,似乎引起了这个问题。
当我在Netty服务器上运行代码时,我得到了和你一样的结果。
2018-02-14 12:30:48.713  INFO 3060 --- [ctor-http-nio-2] reactor.Flux.ConcatMap.1 : onSubscribe(FluxConcatMap.ConcatMapImmediate)
2018-02-14 12:30:48.714  INFO 3060 --- [ctor-http-nio-2] reactor.Flux.ConcatMap.1 : request(1)
2018-02-14 12:30:49.717  INFO 3060 --- [     parallel-2] reactor.Flux.ConcatMap.1 : onNext(a)
2018-02-14 12:30:49.739  INFO 3060 --- [ctor-http-nio-2] reactor.Flux.ConcatMap.1 : request(31)
2018-02-14 12:30:50.731  INFO 3060 --- [     parallel-3] reactor.Flux.ConcatMap.1 : onNext(b)
2018-02-14 12:30:51.733  INFO 3060 --- [     parallel-4] reactor.Flux.ConcatMap.1 : onNext(c)
2018-02-14 12:30:51.735  INFO 3060 --- [     parallel-4] reactor.Flux.ConcatMap.1 : onComplete()

当我在Tomcat服务器上运行相同的代码时,我遇到了取消问题:
2018-02-14 12:33:18.294  INFO 3088 --- [nio-8080-exec-3] reactor.Flux.ConcatMap.2 : onSubscribe(FluxConcatMap.ConcatMapImmediate)
2018-02-14 12:33:18.295  INFO 3088 --- [nio-8080-exec-3] reactor.Flux.ConcatMap.2 : request(1)
2018-02-14 12:33:19.295  INFO 3088 --- [     parallel-4] reactor.Flux.ConcatMap.2 : onNext(a)
2018-02-14 12:33:19.297  INFO 3088 --- [     parallel-4] reactor.Flux.ConcatMap.2 : request(1)
2018-02-14 12:33:20.302  INFO 3088 --- [     parallel-5] reactor.Flux.ConcatMap.2 : onNext(b)
2018-02-14 12:33:20.302  INFO 3088 --- [     parallel-5] reactor.Flux.ConcatMap.2 : request(1)
2018-02-14 12:33:21.306  INFO 3088 --- [     parallel-6] reactor.Flux.ConcatMap.2 : onNext(c)
2018-02-14 12:33:21.306  INFO 3088 --- [     parallel-6] reactor.Flux.ConcatMap.2 : request(1)
2018-02-14 12:33:21.307  INFO 3088 --- [     parallel-6] reactor.Flux.ConcatMap.2 : onComplete()
2018-02-14 12:33:21.307  INFO 3088 --- [nio-8080-exec-4] reactor.Flux.ConcatMap.2 : cancel()

这可能是Tomcat的问题,还是我做错了什么?

你解决了这个问题吗?我也遇到了同样的问题(在Tomcat上)。 - breakline
1个回答

1
首先,我认为您不应该在有限流中使用SSE。
当我创建一个如下的控制器方法时:
@GetMapping(path = "/test", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<String> test() {
    return Flux.just("a", "b", "c").delayElements(Duration.ofSeconds(1)).log();
}

然后使用浏览器(Chrome或Firefox)发送请求:

<script type="text/javascript">
    var testEventSource = new EventSource("/test");
    testEventSource.onmessage = function (e) {
        console.log(e);
    };
</script>

我在服务器上得到以下日志:

| onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
| request(1)
| onNext(a)
| request(31)
| onNext(b)
| onNext(c)
| onComplete()
| onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
| request(1)
| onNext(a)
| request(31)
| onNext(b)
| onNext(c)
| onComplete()

一旦完成 Flux,服务器会关闭连接并自动重新连接浏览器。这将重复相同的序列一遍又一遍。
我在流期间只有当关闭浏览器标签时,服务器才会收到 cancel() 事件。

我根据你的代码编辑了我的初始帖子,加入了新的信息。我忘记提到我使用的是Tomcat服务器而不是Netty服务器。实际上,只有当我的代码在Tomcat服务器上运行时,我才会重现最终的cancel()事件。 - Sancho

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接