下面是我为了以一种看起来干净的方式在NestJS中管理mongoose事务所做的事情。
首先,创建一个简单的抽象类,以通用的方式指定应该可用于管理事务的方法。
import { Injectable } from '@nestjs/common';
@Injectable()
export abstract class DbSession<T> {
public abstract start(): Promise<void>;
public abstract commit(): Promise<void>;
public abstract end(): Promise<void>;
public abstract abort(): Promise<void>;
public abstract get(): T | null;
}
然后你可以使用mongoose实现MongoDB的特殊功能
import { InjectConnection } from '@nestjs/mongoose';
import { Injectable } from '@nestjs/common';
import * as mongoose from 'mongoose';
import { RequestScope } from 'nj-request-scope';
import { DbSession } from '../../abstracts';
@Injectable()
@RequestScope()
export class MongoDbSession implements DbSession<mongoose.ClientSession> {
constructor(
@InjectConnection()
private readonly connection: mongoose.Connection
) {}
private session: mongoose.ClientSession | null = null;
public async start() {
if (this.session) {
if (this.session.inTransaction()) {
await this.session.abortTransaction();
await this.session.endSession();
throw new Error('Session already in transaction');
}
await this.session.endSession();
}
this.session = await this.connection.startSession();
this.session.startTransaction({
readConcern: { level: 'majority' },
writeConcern: { w: 'majority' },
readPreference: 'primary',
retryWrites: true,
});
}
public async commit() {
if (!this.session) {
throw new Error('Session not started');
}
await this.session.commitTransaction();
}
public async end() {
if (!this.session) {
throw new Error('Session not started');
}
await this.session.endSession();
}
public async abort() {
if (!this.session) {
throw new Error('Session not started');
}
await this.session.abortTransaction();
}
public get() {
return this.session;
}
}
我使用@RequestScope(),因为我希望我的单例具有请求范围,但我不希望此范围扩展到所有提供者。您可以在讨论和nj-request-scope README中了解更多信息。
然后,您可以在拦截器中使用此单例。
import {
CallHandler,
ExecutionContext,
Injectable,
Logger,
NestInterceptor,
} from '@nestjs/common';
import { Observable, tap } from 'rxjs';
import { DbSession } from '../abstracts/services';
@Injectable()
export class DbSessionInterceptor implements NestInterceptor {
constructor(private readonly dbSession: DbSession<unknown>) {}
private readonly logger = new Logger(DbSessionInterceptor.name);
async intercept(
_: ExecutionContext,
next: CallHandler
): Promise<Observable<any>> {
await this.dbSession.start();
return next.handle().pipe(
tap({
finalize: async () => {
await this.dbSession.commit();
await this.dbSession.end();
},
error: async (err: Error) => {
await this.dbSession.abort();
await this.dbSession.end();
this.logger.error(err);
throw err;
},
})
);
}
}
这样,如果您决定使用此拦截器,那么在请求处理开始时将自动创建一个事务,并在发送响应之前提交。
您可以通过控制器或解析器调用此拦截器。下面是一个装饰器,它自定义了默认的GraphQL解析器以暗示一个数据库事务。
import { UseInterceptors, applyDecorators } from '@nestjs/common';
import { Resolver } from '@nestjs/graphql';
import { DbSessionInterceptor } from '../abstracts';
export function ResolverWithDbSession(resolverParams?: any) {
return applyDecorators(
Resolver(resolverParams),
UseInterceptors(DbSessionInterceptor)
);
}
如果您需要在请求生命周期之外(在
onModuleInit()
步骤中进行固定操作期间)使用事务,则不能依赖注入器。因此,可以创建另一个装饰器:
import { Inject, Logger } from '@nestjs/common';
import { DbSession } from '../abstracts';
export function WithSessionDb() {
const dbSessionInjector = Inject(DbSession);
return function decorator(
target: any,
_propertyKey: string,
descriptor: any
): void {
dbSessionInjector(target, 'dbSession');
const method = descriptor.value;
const logger = new Logger(`${WithSessionDb.name}#${method.name}`);
descriptor.value = async function wrapper(...args: any[]) {
try {
await this.dbSession.start();
const result = await method.apply(this, args);
await this.dbSession.commit();
return result;
} catch (error) {
await this.dbSession.abort();
logger.error(error);
throw error;
} finally {
await this.dbSession.end();
}
};
};
}
我已经将所有这些文件组织在一个名为“数据服务”的单一模块中。
import { Global, Module } from '@nestjs/common';
import { RequestScopeModule } from 'nj-request-scope';
import { MongoDbSession } from './mongo';
import { DbSession } from './abstracts';
@Global()
@Module({
imports: [RequestScopeModule],
providers: [
{
provide: DbSession,
useClass: MongoDbSession,
},
],
exports: [DbSession],
})
export class DataServicesModule {}
在这个名为data-services.module.ts
的文件中,我注入了MongoDbSession而不是DbSession。
这个模块被声明为全局的,所以我只需要在我的应用程序模块中导入它。
如果你需要在你的服务中使用你的会话,
你只需要使用它的提供者:
import { DbSession } from '../../frameworks/data-services';
@Injectable()
export class SomeRandomService {
constructor(
@InjectModel(SomeRandomModel.name)
private readonly someRandomModel: Model<SomeRandomDocument>,
private readonly dbSession: DbSession<mongoose.ClientSession>
) {}
async countRandom() {
return this.someRandomModel.countDocuments().session(this.dbSession.get()).exec();
}
}
我希望这个解决方案能够帮到你。看起来有点过度,但是通过这样做,你就不必再担心事务了。