我们能否在不使用间隔的情况下在NestJS中使用服务器发送事件?

9

我正在使用 NestJS 创建几个微服务。

例如,我有xyz服务都通过GRPC相互连接,但我希望服务x在特定实体更改时向Web应用程序发送更新,因此我考虑使用服务器发送事件 [欢迎使用其他更好的解决方案]

根据NestJS文档,它们有一个在n间隔运行的函数来处理SSE路由,似乎很耗费资源。是否有一种方法可以在有更新时实际发送事件。

假设我在同一个服务中有另一个API调用,该调用是由另一个Web应用程序上的按钮单击触发的,那么如何仅在单击按钮时触发事件,而不是持续发送事件。如果您知道任何可实现此目的的惯用方法,将不会编写hacky代码,这将受到赞赏,希望它成为最后的手段。

[奖励问题]

我也考虑使用MQTT来发送事件。但我感觉不能让单个服务同时具备MQTT和gRPC。我对使用MQTT持怀疑态度,因为它的延迟如何影响内部消息传递。如果我可以将其限制为外部客户端,那就太好了(即x服务使用gRPC进行内部连接,而MQTT仅需要一个路由暴露给Web应用程序)。 (PS:我是微服务的新手,所以请对您的解决方案进行全面说明:p)

提前感谢你的耐心阅读!

4个回答

16

你可以这样做。重要的是,在 NestJS 中,SSE 是用 Observables 实现的,所以只要你有一个可观察对象,就可以将其添加到其中,并使用它来发送 SSE 事件。最简单的方法是使用 Subjects。我曾经在某个地方有一个例子,但通常情况下,它看起来会像这样

@Controller()
export class SseController {
  constructor(private readonly sseService: SseService) {}

  @SSE()
  doTheSse() {
    return this.sseService.sendEvents();
  }
}
@Injectable()
export class SseService {
  private events = new Subject();

  addEvent(event) {
    this.events.next(event);
  }

  sendEvents() {
    return this.events.asObservable();
  }
}
@Injectable()
export class ButtonTriggeredService {
  constructor(private readonly sseService: SseService) {}

  buttonClickedOrSomething() {
    this.sseService.addEvent(buttonClickedEvent);
  }
}

请原谅以上伪代码的性质,但总体而言,它展示了如何使用Subjects为SSE事件创建可观测对象。只要@SSE()端点返回具有正确形状的可观测对象,您就可以轻松处理。


1
有没有办法将事件主题隔离到特定用户? 对于控制器中的SSE,我正在执行类似@Sse('/status/update/:userId')的操作。 我在考虑是否应该使用map<userId, subject>。还有更好的解决方案吗? - Nikhil.Nixel
1
@Nikhil.Nixel,你找到了隔离特定用户的解决方案吗?我目前和你处于同样的情况。 - Mohaimin
1
我做了类似这样的事情,@Sse('/status/update/:userId'),然后我可以使用userId作为键将主题存储在映射中。如果您有大量用户,则不适合使用RAM,因为如果由于太多用户而有太多主题,它将继续占用这些空间。 - Nikhil.Nixel
我认为SSE不适合用于这些情况,以便隔离到特定实体。你可以尝试使用普通的套接字来获得更好的运气 @Mohaimin - Nikhil.Nixel
你是如何返回那个特定键(userId)的数据的? 关于套接字实现,是的,我也在考虑,但目前我只是好奇:P - Mohaimin
显示剩余7条评论

15

使用 NestJS 的 SSE 处理事件的更好方式:

请参考此存储库,其中包含代码示例:

https://github.com/ningacoding/nest-sse-bug/tree/main/src

基本上你有一个服务:

import {Injectable} from '@nestjs/common';
import {fromEvent} from "rxjs";
import {EventEmitter} from "events";

@Injectable()
export class EventsService {

    private readonly emitter = new EventEmitter();

    subscribe(channel: string) {
        return fromEvent(this.emitter, channel);
    }

    emit(channel: string, data?: object) {
        this.emitter.emit(channel, {data});
    }

}

显然,channel可以是任何字符串,建议使用路径样式。

例如:"events/for/<user_id>",订阅该频道的用户将仅接收该频道的事件,并且仅在触发时才会接收到;) -

完全兼容@UseGuards等。 :)

附注:不要在EventsService中注入任何服务,因为存在已知的错误。


@wopolow 不要在EventsService中注入任何服务,因为这会导致其停止工作。 - NingaCodingTRV
@NingaCodingTRV 现在似乎不会触发此错误。 - puz_zle
你从控制器文件中返回了什么? - Bennison J
@BennisonJ 请查看带有代码示例的存储库:https://github.com/ningacoding/nest-sse-bug/tree/main/src - NingaCodingTRV
@NingaCodingTRV 修好了吗?我克隆并运行了它,它可以工作。 - tunapq
显示剩余2条评论

0

是的,这是可能的,我们可以使用事件发射器而不是间隔。 每当事件被发射时,我们可以将响应发送回客户端。

notification.controller.ts

import { Public } from 'src/decorators';
import { Observable } from 'rxjs';
import { FastifyReply } from 'fastify';
import { NotificationService } from './notification.service';

import { Sse, Controller, Res } from '@nestjs/common';

@Public()
@Controller()
export class NotificationController {
  constructor(private notificationService: NotificationService) {}
  @Sse('notifications')
  async sendNotification(@Res() reply: FastifyReply): Promise<Observable<any>> {
    return await this.notificationService.handleConnection();
  }
}

notification.service.ts

import { Injectable } from '@nestjs/common';
import { Subject } from 'rxjs';

@Injectable()
export class NotificationService {
  notificationEvent: Subject<any> = new Subject();
  async handleConnection() {
    setInterval(() => {
      this.notificationEvent.next({ data: { message: 'Hello World' } });
    }, 1000);
    return this.notificationEvent.asObservable();
  }
}
  • 这里我使用了setInterval,只是为了发出一个新的事件。您可以在代码中任何地方发出新事件,而无需使用setInterval方法。
  • 我发现这种方法存在一些问题,并且我分享了下面的链接,请查看下面的链接。 nestJs中的SSE

-1
  @Sse('sse-endpoint')
  sse(): Observable<any> {
    //data have to strem
    const arr = ['d1','d2', 'd3']; 
    return new Observable((subscriber) => {
        while(arr.len){
            subscriber.next(arr.pop()); // data have to return in every chunk
        }
        if(arr.len == 0) subscriber.complete(); // complete the subscription
    });
  }

答案应包括为什么代码片段是解决方案的解释,请编辑此答案以添加一些上下文。 - hardillb

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接