如何处理Spring Boot 2在Angular 5中发布的JSON流

15

Spring Boot 2 WebFlux 在新版本中可以生成 JSON 流。

例如:

@GetMapping(value = "stream", produces = APPLICATION_STREAM_JSON_VALUE)
public Flux<Data> stream() {
    return Flux.interval(Duration.ofSeconds(1)).map(Data::new);
}

每秒将产生发布新数据

{"value":"1"}
{"value":"2"}
{"value":"3"}
{"value":"4"}
{"value":"5"}
{"value":"6"}

我尝试过Angular 5的HttpClient

findAll(): Observable<Data> {
   return this._http.get<Data>(this.url);
}

但是对于我来说它不起作用,因为我希望它是反应性的,它不会像缓存结果一样发送结果直到连接关闭

我想问在 Angular 5 中处理这个 JSON 的最佳方法是什么


请阅读“如何提问”:https://stackoverflow.com/help/how-to-ask。 你能告诉我们你已经尝试了什么吗?你想改进目前的做法吗? - ibenjelloun
好的,抱歉我已经更新了问题@ibenjelloun。 - ashraf revo
我认为你应该使用WebSocket而不是HttpClient。看一下这个教程:https://tutorialedge.net/typescript/angular/angular-websockets-tutorial/ - ibenjelloun
5个回答

8
据我所知,目前仍然没有官方解决方法(截至2018年8月19日),不过我已经找到了一些解决方法。每个 HttpClient 方法都有一个 config 参数,您可以在其中传递 responseType 和其他信息。我将这些设置混合在下面:
{observe: 'events', responseType: 'text', reportProgress: true}

然后您将收到指定类型的事件,范围为0到4。至少在我的情况下,type 3是有趣的内容,在partialText字段中,但要注意 - 在您的情况下,这些消息(在partialText字段中)将如下所示:

1条消息:

{"value":"1"}

2条消息:

{"value":"1"}
{"value":"2"}

3条消息

{"value":"1"}
{"value":"2"}
{"value":"3"}

所以,我已经像下面这样管理它:
method(url, /*if post - body,*/
      {observe: 'events', responseType: 'text', reportProgress: true})
      .pipe(
        filter(e => e.type === 3 && e.partialText),
        map(e => {
          const partials = e.partialText.trim().split('\n');
          return JSON.parse(partials.pop());
        })
      );

3
使用服务器发送事件(Server-Sent Events),可以像这样实现:
import * as EventSource from 'eventsource';
...

const eventSource = new EventSource("http://www.example.com/stream");

eventSource.onmessage = (event) => {
  const data = JSON.parse(event['data']);
}

请确保执行 npm install @types/eventsource 以避免 TypeScript 错误。 - Edgar Quintero

2

那是 NDJson 格式。我通过以下方式处理它:

let handled = 0
http.get<any>("http://localhost:9000/myapi", {
    observe: "events",
    reportProgress: true,
    responseType: "text" as "json",
}).pipe(
    filter((e: any) => e.type === HttpEventType.DownloadProgress && e.partialText),
    map(e => e.partialText.trim().split("\n"))
).subscribe((arr) => {
    for (let i = handled; i < arr.length; i++) {
        try {
            console.log(JSON.parse(arr[i])) // Do obs.next(obj) here
            handled = i + 1
        } catch (e) { }
    }
})

您可以将该代码包装在Observable中,并在console.log处进行obs.next(JSON.parse(arr[i]))


1

浏览器客户端除了使用服务器推送事件(Server-Sent Events)或 WebSocket,没有其他方式能够消费 JSON 流(application/stream+json)。

根据您所描述的需求和技术,WebSocket 更加适合。


0

我在思考的一个想法是,也许你可以将打开的连接(获取/提取过程)保持在一个“fire and forget”任务中,然后在另一个异步循环中检查是否有不同于存储内容的内容需要保存和显示。

或许可以这样做。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接