在深入讨论问题之前,让我先向您解释一下我的应用程序的基本情况。
我的应用程序连接了DB(TypeOrm)和Kafka(kafkajs)。
我的应用程序是1个主题的消费者,它执行以下操作:
- 在回调处理程序中获取一些数据,并使用TypeORM实体将该数据放入一个表中
- 在某个类的单例实例中维护全局映射(来自点1的数据)
在应用程序关闭时,我的任务是:
- 断开与Kafka连接的所有主题的消费者
- 遍历全局映射(点2)并将消息重新发送到某个主题
- 使用close方法断开DB连接
以下是一些代码片段,这可能有助于您了解我如何在NestJs中为服务器添加生命周期事件。
system.server.life.cycle.events.ts
@Injectable()
export class SystemServerLifeCycleEventsShared implements BeforeApplicationShutdown {
constructor(@Inject(WINSTON_MODULE_PROVIDER) private readonly logger: Logger, private readonly someService: SomeService) {}
async beforeApplicationShutdown(signal: string) {
const [err] = await this.someService.handleAbruptEnding();
if (err) this.logger.info(`beforeApplicationShutdown, error::: ${JSON.stringify(err)}`);
this.logger.info(`beforeApplicationShutdown, signal ${signal}`);
}
}
some.service.ts
export class SomeService {
constructor(private readonly kafkaConnector: KafkaConnector, private readonly postgresConnector: PostgresConnector) {}
public async handleAbruptEnding(): Promise<any> {
await this.kafkaConnector.disconnectAllConsumers();
for(READ_FROM_GLOBAL_STORE) {
await this.kafkaConnector.function.call.to.repark.the.message();
}
await this.postgresConnector.disconnectAllConnections();
return true;
}
}
postgres.connector.ts
export class PostgresConnector {
private connectionManager: ConnectionManager;
constructor () {
this.connectionManager = getConnectionManager();
}
public async disconnectAllConnections(): Promise<void[]> {
const connectionClosePromises: Promise<void> = [];
connectionManager.connections?.forEach((connection) => {
if (connection.isConnected) connectionClosePromises.push(connection.close());
});
return Promise.all(connectionClosePromises);
}
}
从TypeORM模块导入ConnectionManager& getConnectionManager()。
现在我遇到了一些异常/行为:
断开所有连接会抛出异常/错误,如引用所示:
ERROR [TypeOrmModule] Cannot execute operation on "default" connection because connection is not yet established. 如果连接还没有建立,那么我的isConnected怎么会在if中为真。我无法得知这是如何可能的。在TypeORM中如何优雅地关闭连接。
我们是否真的需要在TypeORM中处理连接的关闭或者它在内部处理。
即使TypeORM在内部处理连接关闭,我们如何显式实现它。
是否有回调函数可以在连接正确断开时触发,以便我确信断开连接实际上发生在数据库中。
一些消息是在我按下CTRL + C后(模拟我的服务器进程的突然关闭)出现的,控制权回到终端。这意味着,在句柄返回到我的终端之后,某个线程正在返回(不知道如何处理此问题,因为如果您看一下,我的handleAbruptHandling是awaited的,并且我交叉检查了所有承诺都被正确地等待了。)
需要知道的一些事情:
- 我已正确添加了模块以创建服务器生命周期事件的钩子。
- 已正确注入几乎所有类中的对象。
- 没有从NEST获得任何DI问题,服务器正在正确启动。
请指点迷津,并告诉我如何在NestJs中使用typeorm api优雅地断开与数据库的连接以处理突然关闭的情况。
提前感谢您的帮助,祝编码愉快 :)
await getConnectionManager().get()?.close();
。 - Umur Karagöz