NestJs @Sse - 事件仅被一个客户端消费

6
我尝试了使用 nest.js 提供的样例 SSE 应用程序(28-SSE),并修改了 SSE 终端点以发送一个计数器:
  @Sse('sse')
  sse(): Observable<MessageEvent> {
    return interval(5000).pipe(
      map((_) => ({ data: { hello: `world - ${this.c++}` }} as MessageEvent)),
    );
  }

我期望每个监听该SSE的客户端都会收到消息,但当我打开多个浏览器标签时,我发现每条消息只被一个浏览器消费,所以如果我打开了三个浏览器,我会得到以下结果:

enter image description here

如何才能达到预期的行为?

你的截图中是同一个浏览器(配置文件)选项卡吗? - n1md7
即使在两个不同的浏览器(Chrome / Edge)中,行为也是相同的。 - Ronen
3个回答

6
为了实现您期望的行为,您需要为每个连接创建一个单独的流,并按您所需推送数据流。
以下是一种可能的简洁解决方案。
import { Controller, Get, MessageEvent, OnModuleDestroy, OnModuleInit, Res, Sse } from '@nestjs/common';
import { readFileSync } from 'fs';
import { join } from 'path';
import { Observable, ReplaySubject } from 'rxjs';
import { map } from 'rxjs/operators';
import { Response } from 'express';

@Controller()
export class AppController implements OnModuleInit, OnModuleDestroy {
  private stream: {
    id: string;
    subject: ReplaySubject<unknown>;
    observer: Observable<unknown>;
  }[] = [];
  private timer: NodeJS.Timeout;
  private id = 0;

  public onModuleInit(): void {
    this.timer = setInterval(() => {
      this.id += 1;
      this.stream.forEach(({ subject }) => subject.next(this.id));
    }, 1000);
  }

  public onModuleDestroy(): void {
    clearInterval(this.timer);
  }

  @Get()
  public index(): string {
    return readFileSync(join(__dirname, 'index.html'), 'utf-8').toString();
  }

  @Sse('sse')
  public sse(@Res() response: Response): Observable<MessageEvent> {
    const id = AppController.genStreamId();
    // Clean up the stream when the client disconnects
    response.on('close', () => this.removeStream(id));
    // Create a new stream
    const subject = new ReplaySubject();
    const observer = subject.asObservable();
    this.addStream(subject, observer, id);

    return observer.pipe(map((data) => ({
      id: `my-stream-id:${id}`,
      data: `Hello world ${data}`,
      event: 'my-event-name',
    }) as MessageEvent));
  }

  private addStream(subject: ReplaySubject<unknown>, observer: Observable<unknown>, id: string): void {
    this.stream.push({
      id,
      subject,
      observer,
    });
  }

  private removeStream(id: string): void {
    this.stream = this.stream.filter(stream => stream.id !== id);
  }

  private static genStreamId(): string {
    return Math.random().toString(36).substring(2, 15);
  }
}


你可以为此创建一个单独的服务,使其更加清洁,并从不同的地方推送流数据,但作为示例展示,结果如下所示屏幕截图。

enter image description here


在这种情况下使用回复主题的重要性是什么? - MrSolarius
2
SSE在NestJS中工作的唯一要求是返回一个Observable流。从技术上讲,您可以使用其中包含静态数据的new Observable(...)进行返回,但在实际应用场景中,您不希望返回静态数据,而是动态数据。当您想要将数据注入到流中时,可以使用SubjectReplaySubject,它们几乎相同。问题在于,使用Subject/ReplaySubject,您可以将其作为observable返回,并使用next方法将数据推送到流中,而在new Observable()中则不行。(至少我是这么认为的) - n1md7

2

对于同一主题的每个SSE连接,您只需生成一个新的可观察对象。

 private events: Subject<MessageEvent> = new Subject();

 constuctor(){
   timer(0, 1000).pipe(takeUntil(this.destroy)).subscribe(async (index: any)=>{
            let event: MessageEvent = { 
                id: index, 
                type: 'test', 
                retry: 30000, 
                data: {index: index}
            } as MessageEvent;
            this.events.next(event);
   });
 }

 @Sse('sse')
 public sse(): Observable<MessageEvent> {
   return this.events.asObservable();
 }

注意:我会跳过控制器代码的其余部分。
谢谢,

1
这种行为是正确的。每个SSE连接都是一个专用套接字,由专用服务器进程处理。因此,每个客户端可以接收不同的数据。这不是一种广播相同内容给多个客户端的技术。
“我如何获得期望的行为?”
有一个中央记录(例如在SQL数据库中)所需的值,您想要将其发送给所有连接的客户端。
然后,让每个SSE服务器进程监视或轮询该中央记录,并在每次更改时发送事件。

这种行为是正确的。每个SSE连接都是一个专用的套接字,并由专用的服务器进程处理。因此,每个客户端可以接收不同的数据。这不是一种广播相同内容给多个客户端的技术。这难道不与SSE的一对多广播概念相矛盾吗? - undefined
@sharon.biren SSE不是一对多的广播技术。(Websockets也不是。)如果你在某个地方看到它被描述成这样,那你看的是一个不可靠的参考资料。 - undefined
如果有多个客户端注册到相同的SSE端点(在NestJS中),那么当服务器返回消息事件时,它只会传递给一个客户端吗? - undefined
@sharon.biren 每个客户端到SSE端点的连接都是一个专用的套接字,服务器可以向每个客户端发送不同的消息,并且在不同的时间发送。然而,服务器可以维护一个包含所有这些连接的列表,并循环遍历它们并发送相同的消息给每个连接。因此,您可以在SSE的基础上构建一个广播相同消息给多个客户端的技术。 - undefined
这很奇怪,因为当我发出某个事件时,订阅了这个端点的两个Chrome标签会收到相同的消息,而不需要维护所有客户端连接的列表。我在NestJS中使用EventEmitter。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接