使用Redis实现NestJS外部事件总线

4
我正在尝试将我的nestjs应用程序的cqrs设置与外部消息服务(如Redis)集成。在nestjs的Github上,我找到了一个拉取请求和一个评论,说明从cqrs 7.0版本开始,应该能够将我的查询/事件/命令总线与外部服务集成。
我一直在努力实现这一点,但在nestjs上并没有找到太多关于此主题的信息。我唯一能找到的是一个已过时的配置示例和一个开放主题,用于创建有关如何实现此功能的教程。我设法通过在GitHub上找到的有限帮助来替换默认的发布者和订阅者,但我不太明白如何使用它来连接到外部服务,或者是否这是解决此问题的最佳方法。

事件总线

import { RedisEventSubscriber } from '../busses/redisEventSubscriber';
import { RedisEventPublisher } from '../busses/redisEventPublisher';
import { OnModuleInit } from '@nestjs/common';
import { ModuleRef } from "@nestjs/core";
import { CommandBus, EventBus as NestJsEventBus } from "@nestjs/cqrs";

export class EventBus extends NestJsEventBus implements OnModuleInit {

constructor( commandBus: CommandBus, moduleRef: ModuleRef) {
  super(commandBus, moduleRef);
}

onModuleInit() {

  const subscriber = new RedisEventSubscriber();
  subscriber.bridgeEventsTo(this._subject$);
  this.publisher = new RedisEventPublisher();

  }
}

Publisher

export class RedisEventPublisher implements IEventPublisher {

publish<T extends IEvent = IEvent>(event: T) {
  console.log("Event published to Redis")
  }
}

订阅者

export class RedisEventSubscriber implements IMessageSource {

  bridgeEventsTo<T extends IEvent>(subject: Subject<T>) {
    console.log('bridged event to thingy')
  }
}

如果有人之前已经使用外部消息系统设置过nestjs,能够分享他们的想法或共享如何正确执行此操作的资源,将不胜感激。
1个回答

10
经过几天的努力,我找到了两种连接外部事件总线的方法。我发现我并不真正需要外部命令或查询总线,因为它们是通过API调用传递的。因此,如果你想在NestJS中连接到外部事件总线,这里有两个选项可供选择:
1. 通过自定义发布者和订阅者 2. 通过NestJS微服务包
这两种方法主要区别在于它们连接到外部事件总线的方式以及它们处理接收到的消息的方式。根据你的需求,其中一种可能比另一种更适合你,但我选择了第一种选项。
自定义发布者和订阅者
在我的应用程序中,我已经在使用NestJS的EventBus类通过调用`.publish()`对我的事件进行手动发布。我创建了一个服务,它包装了本地NestJS事件总线以及自定义的发布者和订阅者。
eventBusService.ts
export class EventBusService implements IEventBusService {
  
  constructor(
    private local: EventBus, // Injected from NestJS CQRS Module
    @Inject('eventPublisher') private publisher: IEventPublisher,
    @Inject('eventSubscriber') subscriber: IMessageSource) {
      subscriber.bridgeEventsTo(this.local.subject$);
   }
  
  publish(event: IEvent): void {
    this.publisher.publish(event);
  };
} 

事件服务使用自定义订阅器通过 .bridgeEventsTo() 重定向远程事件总线上的所有传入事件到本地事件总线。自定义订阅器使用Redis NPM包的客户端与事件总线进行通信。

subscriber.ts

export class RedisEventSubscriber implements IMessageSource {

  constructor(@Inject('redisClient') private client: RedisClient) {}

  bridgeEventsTo<T extends IEvent>(subject: Subject<T>) {
    this.client.subscribe('Foo');
    this.client.on("message", (channel: string, message: string) => {

      const { payload, header } = JSON.parse(message);
      const event = Events[header.name];

      subject.next(new event(data.event.payload));
    });
  }
};

这个函数还包含了将一个传入的Redis事件映射成一个事件的逻辑。为此,我在app.module中创建了一个包含所有应用程序事件的字典,以便查找该事件是否知道如何处理传入的事件。然后,它使用一个新的事件调用subject.next(),使其放置在内部的NestJS事件总线上。

publisher.ts

为了从我的自定义事件更新其他系统,我创建了一个发布者,将数据发送到Redis。

export class RedisEventPublisher implements IEventPublisher {

  constructor(@Inject('redisClient') private client: RedisClient) {}

  publish<T extends IEvent = IEvent>(event: T) {
    const name = event.constructor.name;
    const request = {
      header: {
        name
      },
      payload: {
        event
      }
    }
    this.client.publish('Foo', JSON.stringify(request));
  }
}

和订阅者一样,这个类使用NPM包client将事件发送到Redis eventBus。


微服务设置

在某些方面,微服务的设置与自定义事件服务方法非常相似。它使用相同的发布者类,但是订阅设置是不同的。它使用NestJS微服务包来设置一个微服务,监听传入的消息,然后调用eventService将传入的事件发送到eventbus。

eventService.ts

export class EventBusService implements IEventBusService {
  
  constructor(
    private eventBus: EventBus,
    @Inject('eventPublisher') private eventPublisher: IEventPublisher,) {
   }
  
  public publish<T extends IEvent>(event: T): void {

    const data = {
      payload: event,
      eventName: event.constructor.name
    }
    
    this.eventPublisher.publish(data);
  };

  async handle(string: string) : Promise<void> {

    const data = JSON.parse(string);
    const event = Events[data.event.eventName];

    if (!event) {
      console.log(`Could not find corresponding event for 
      ${data.event.eventName}`);
    };

    await this.eventBus.publish(new event(data.event.payload));
  }
} 

NestJS有关于如何设置混合服务的文档,详见这里。微服务包提供了一个@EventPattern()装饰器,您可以用来创建处理传入事件总线消息的处理程序,只需将它们添加到NestJS控制器中并注入eventService即可。

controller.ts

@Controller()
export default class EventController {

  constructor(@Inject('eventBusService') private eventBusService: 
  IEventBusService) {}

  @EventPattern(inviteServiceTopic)
  handleInviteServiceEvents(data: string) {
    this.eventBusService.handle(data)
  }
}

由于我不想创建一个混合应用程序来监听传入的事件,所以我决定采用第一种选项。代码被很好地组合在一起,而不是具有随机控制器和@EventPattern()装饰器。

这花费了相当长的时间才弄清楚,所以我希望它能对未来的某个人有所帮助。 :)


不知道你是否有公开的 GitHub/GitLab 存储库链接,包含完整的代码? - Jérôme
不,我没有在公共仓库中。所有你需要的代码都在答案里了。唯一缺少的是一些常规的NestJS部分,例如控制器和注入部分,但可以在文档中找到。 - Jordi
文档没有很好地涵盖这个问题。你考虑过创建一个PR来添加它吗? - Donny Verduijn

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接