NestJS NATS请求响应

9

我正在尝试使用NestJS和NATS微服务。有很好的文档可以设置基本的请求-响应。

我的做法如下:

运行一个本地NATS服务器。

设置main.ts连接到该服务器:

async function bootstrap() {
  const app = await NestFactory.createMicroservice(AppModule, {
    options: {
      url: "nats://localhost:4222",
    },
    transport: Transport.NATS,
  });
  app.listen(() => console.log("Microservice is listening"));
}
bootstrap();

创建了一个ClientProxyFactory来发送消息:
export const NatsClientProvider: Provider = {
  inject: [ConfigService],
  provide: NatsClientProviderId,
  useFactory: async (config: ConfigService) =>
    ClientProxyFactory.create({
      options: {
        servers: config.getNatsConfig().servers,
      },
      transport: Transport.NATS,
    }),
};

设置一个控制器 app.controller.ts 来响应特定的模式:

@Controller()
export class AppController {
  constructor(
    private readonly appService: AppService,
    @Inject(NatsClientProviderId) private readonly natsClient: ClientProxy,
  ) {}

  @MessagePattern("hello")
  async getHello(data: string) {
    console.log("data: ", data);
    console.log("getHello!!");
    await this.natsClient.send("hello", this.appService.getHello());
    return this.appService.getHello();
  }

  async onModuleInit() {
    await this.natsClient.connect();
    console.log("Nats connected!");
  }

创建一个测试文件来尝试发送请求-响应消息:

import { connect } from "ts-nats";

async function start() {
  const nc = await connect({
    servers: ["nats://localhost:4222"],
  });

  const msg = await nc.request("hello", 5000, "me");
  console.log("msg: ", msg);
}

start();

当我运行我的 Nest 应用程序时,我可以在 NATS 服务器日志中正确地看到订阅已创建。
当我运行 test.ts 文件时,它会超时并显示 NatsError: Request timed out.。但是,我可以看到我的控制台日志(尽管数据为 undefined,即使我在发布的消息中指定了它)。 returnclient.send 方法都无法接收来自应用程序的消息。
非常感谢您提供帮助!
编辑: 仍在研究和困扰这个问题。在 Microservice docs 的“发送消息”部分中,它说:“模式必须等于 @MessagePattern() 装饰器中定义的模式,而有效载荷是我们想要传输到另一个微服务的消息。”如果我这样做,Nest 应用程序会检测到它发送的消息,并陷入一个无限循环,不断地将消息发送和接收回自己,直到永远。
2个回答

5
为避免控制器中的无限循环,请删除natsClient.send语句。在您的情况下,MessagePattern将自动使用您从函数返回的数据发送回复,即this.appService.getHello()
@MessagePattern("hello")
async getHello(data: string) {
  console.log("data: ", data);
  return "Hello World!";
}

Nest要求你发送一个长的id属性(任何字符串都可以)才能回复一条消息。只需将其包含在数据json中即可:

// Nest expects the data to have the following structure
const reply = await nc.request("hello", 500, JSON.stringify({ data: "Hello", id: "myid" }));
console.log({ reply });

在您的嵌套日志中,您会看到以下日志条目:
data: Hello

在你的测试脚本中,你会看到这个:

{ reply:
   { subject: '_INBOX.GJGL6RJFYXKMCF8CWXO0HB.GJGL6RJFYXKMCF8CWXO0B5',
     sid: 1,
     reply: undefined,
     size: 50,
     data: '{"err":null,"response":"Hello World!","id":"myid"}' 
} }

抱歉,你是对的,之前的不正确。请看我的修改。缺失的部分是 id 字段。 - Kim Kern
谢谢您的帮助!在Nest中是否有任何方法也可以获取输入消息的主题?例如,如果我想订阅user.get.>,以便我可以接收到发送到user.get.MY_USER_ID的消息,并能够看到MY_USER_ID是主题中的内容。 - rhlsthrm
我不知道。:/ 它是否支持嵌套中的通配符? - Kim Kern
我刚刚测试了一下,实际上它可以与通配符订阅一起使用,就像我提供的示例(@MessagePattern('hello.>')和请求主题hello.world)。现在,如果我能够打印出主题,那就没问题了。 - rhlsthrm
1
我认为这部分不在API中,但您可以为此打开问题(甚至拉取请求)。我认为这不应该太难。 - Kim Kern
显示剩余4条评论

3
当使用ClientProxy时,sendemit返回Observables。需要“激活”它们才能执行任何操作。因此,您可以将其订阅或更改为Promise。
由于您正在使用await,因此您可能想要执行以下操作:
await this.natsClient.send("hello", this.appService.getHello()).toPromise();

嗯,谢谢提供的信息。我尝试了这个方法,但仍然出现了无限循环的问题,服务一直在响应自己的消息(看起来是由于@MessagePattern消息与正在发送的主题相同所致)。虽然我假设为了实现请求-响应功能需要这样做,但我不明白如何使它不响应自身,因为ClientProxy和Nest微服务都连接到同一个NATS服务器。 - rhlsthrm

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接