Nestjs:如何使用mongoose启动事务会话? Nestjs:如何使用mongoose启动事务会话?

9

使用事务的 mongoose 文档很简单,但在 nestjs 中遵循该文档后,会返回一个错误:

Connection 0 was disconnected when calling `startSession`
MongooseError: Connection 0 was disconnected when calling `startSession`
    at NativeConnection.startSession

我的代码:

const transactionSession = await mongoose.startSession();
    transactionSession.startTransaction();

    try
    {
      const newSignupBody: CreateUserDto = {password: hashedPassword, email, username};
  
      const user: User = await this.userService.create(newSignupBody);

      //save the profile.
      const profile: Profile = await this.profileService.create(user['Id'], signupDto);

      const result:AuthResponseDto = this.getAuthUserResponse(user, profile);

      transactionSession.commitTransaction();
      return result;
    }
    catch(err)
    {
      transactionSession.abortTransaction();
    }
    finally
    {
      transactionSession.endSession();
    }
3个回答

18

在研究 @nestjs/mongoose 之后,我找到了解决方案。这里的 mongoose 没有连接。这就是返回错误的原因。

解决方案:

import {InjectConnection} from '@nestjs/mongoose';
import * as mongoose from 'mongoose';
在服务类的构造函数中,我们需要添加连接参数,以便服务可以使用。
export class AuthService {
constructor(
  // other dependencies...
  @InjectConnection() private readonly connection: mongoose.Connection){}

相比于

const transactionSession = await mongoose.startSession();
transactionSession.startTransaction();

我们现在将使用:

const transactionSession = await this.connection.startSession();
transactionSession.startTransaction();

这样,startSession()后断开连接的问题就可以得到解决。


4
除了Noobish提供的答案外,我想展示一下我在项目中使用的可重复使用的函数:
import { ClientSession, Connection } from 'mongoose';

export const transaction = async <T>(connection: Connection, cb: (session: ClientSession) => Promise<T>): Promise<T> => {
  const session = await connection.startSession();

  try {
    session.startTransaction();
    const result = await cb(session);
    await session.commitTransaction();
    return result;
  } catch (err) {
    await session.abortTransaction();
    throw err;
  } finally {
    await session.endSession();
  }
}

可以这样使用:
@Injectable()
export class MyService {
  constructor(
    @InjectModel(MyModel.name) private myModel: Model<MyModelDocument>,
    @InjectConnection() private connection: Connection,
  ) {}

  async find(id: string): Promise<MyModelDocument> {
    return transaction(this.connection, async session => {
      return this.myModel
        .findOne(id)
        .session(session);
    });
  }

  async create(myDto: MyDto): Promise<MyModelDocument> {
    return transaction(this.connection, async session => {
      const newDoc = new this.myModel(myDto);
      return newDoc.save({ session });
    });
  }

显然,上面的示例仅用于演示目的,其中不需要进行交易,因为操作已经是原子的。然而,我们可以通过一个更复杂的示例来扩展内部回调函数,其中操作不是原子的,例如:
  1. 创建一个引用另一个模式的文档
  2. 查找该引用文档是否存在,如果存在,则将其添加到数组中(一对多关系)
  3. 如果引用文档不存在,则抛出错误。这将中止事务并回滚所有更改,例如删除在步骤1中创建的文档。在所有这些步骤中,需要仔细指定会话。

0

下面是我为了以一种看起来干净的方式在NestJS中管理mongoose事务所做的事情。

首先,创建一个简单的抽象类,以通用的方式指定应该可用于管理事务的方法。

import { Injectable } from '@nestjs/common';

@Injectable()
export abstract class DbSession<T> {
  public abstract start(): Promise<void>;
  public abstract commit(): Promise<void>;
  public abstract end(): Promise<void>;
  public abstract abort(): Promise<void>;
  public abstract get(): T | null;
}

然后你可以使用mongoose实现MongoDB的特殊功能

import { InjectConnection } from '@nestjs/mongoose';
import { Injectable } from '@nestjs/common';
import * as mongoose from 'mongoose';
import { RequestScope } from 'nj-request-scope';

import { DbSession } from '../../abstracts';

@Injectable()
@RequestScope()
export class MongoDbSession implements DbSession<mongoose.ClientSession> {
  constructor(
    @InjectConnection()
    private readonly connection: mongoose.Connection
  ) {}

  private session: mongoose.ClientSession | null = null;

  public async start() {
    if (this.session) {
      if (this.session.inTransaction()) {
        await this.session.abortTransaction();
        await this.session.endSession();
        throw new Error('Session already in transaction');
      }
      await this.session.endSession();
    }
    this.session = await this.connection.startSession();
    this.session.startTransaction({
      readConcern: { level: 'majority' },
      writeConcern: { w: 'majority' },
      readPreference: 'primary',
      retryWrites: true,
    });
  }

  public async commit() {
    if (!this.session) {
      throw new Error('Session not started');
    }
    await this.session.commitTransaction();
  }

  public async end() {
    if (!this.session) {
      throw new Error('Session not started');
    }
    await this.session.endSession();
  }

  public async abort() {
    if (!this.session) {
      throw new Error('Session not started');
    }
    await this.session.abortTransaction();
  }

  public get() {
    return this.session;
  }
}

我使用@RequestScope(),因为我希望我的单例具有请求范围,但我不希望此范围扩展到所有提供者。您可以在讨论和nj-request-scope README中了解更多信息。

然后,您可以在拦截器中使用此单例。

import {
  CallHandler,
  ExecutionContext,
  Injectable,
  Logger,
  NestInterceptor,
} from '@nestjs/common';
import { Observable, tap } from 'rxjs';

import { DbSession } from '../abstracts/services';

@Injectable()
export class DbSessionInterceptor implements NestInterceptor {
  constructor(private readonly dbSession: DbSession<unknown>) {}

  private readonly logger = new Logger(DbSessionInterceptor.name);

  async intercept(
    _: ExecutionContext,
    next: CallHandler
  ): Promise<Observable<any>> {
    await this.dbSession.start();

    return next.handle().pipe(
      tap({
        finalize: async () => {
          await this.dbSession.commit();
          await this.dbSession.end();
        },
        error: async (err: Error) => {
          await this.dbSession.abort();
          await this.dbSession.end();
          this.logger.error(err);
          throw err;
        },
      })
    );
  }
}

这样,如果您决定使用此拦截器,那么在请求处理开始时将自动创建一个事务,并在发送响应之前提交。

您可以通过控制器或解析器调用此拦截器。下面是一个装饰器,它自定义了默认的GraphQL解析器以暗示一个数据库事务。

import { UseInterceptors, applyDecorators } from '@nestjs/common';
import { Resolver } from '@nestjs/graphql';

import { DbSessionInterceptor } from '../abstracts';

export function ResolverWithDbSession(resolverParams?: any) {
  return applyDecorators(
    Resolver(resolverParams),
    UseInterceptors(DbSessionInterceptor)
  );
}


如果您需要在请求生命周期之外(在onModuleInit()步骤中进行固定操作期间)使用事务,则不能依赖注入器。因此,可以创建另一个装饰器:
import { Inject, Logger } from '@nestjs/common';

import { DbSession } from '../abstracts';

export function WithSessionDb() {
  const dbSessionInjector = Inject(DbSession);

  return function decorator(
    target: any,
    _propertyKey: string,
    descriptor: any // PropertyDescriptor
  ): void {
    dbSessionInjector(target, 'dbSession');
    const method = descriptor.value;
    const logger = new Logger(`${WithSessionDb.name}#${method.name}`);

    descriptor.value = async function wrapper(...args: any[]) {
      try {
        await this.dbSession.start();
        const result = await method.apply(this, args);
        await this.dbSession.commit();
        return result;
      } catch (error) {
        await this.dbSession.abort();
        logger.error(error);
        throw error;
      } finally {
        await this.dbSession.end();
      }
    };
  };
}


我已经将所有这些文件组织在一个名为“数据服务”的单一模块中。
import { Global, Module } from '@nestjs/common';
import { RequestScopeModule } from 'nj-request-scope';

import { MongoDbSession } from './mongo';
import { DbSession } from './abstracts';

@Global()
@Module({
  imports: [RequestScopeModule],
  providers: [
    {
      provide: DbSession,
      useClass: MongoDbSession,
    },
  ],
  exports: [DbSession],
})
// eslint-disable-next-line @typescript-eslint/no-extraneous-class
export class DataServicesModule {}

在这个名为data-services.module.ts的文件中,我注入了MongoDbSession而不是DbSession。

这个模块被声明为全局的,所以我只需要在我的应用程序模块中导入它。

如果你需要在你的服务中使用你的会话, 你只需要使用它的提供者:

// ...
import { DbSession } from '../../frameworks/data-services';
// ...

@Injectable()
export class SomeRandomService {
  constructor(
    @InjectModel(SomeRandomModel.name)
    private readonly someRandomModel: Model<SomeRandomDocument>,
    // ...
    private readonly dbSession: DbSession<mongoose.ClientSession>
  ) {}

  
  async countRandom() {
    return this.someRandomModel.countDocuments().session(this.dbSession.get()).exec();
  }

  // ...
}

我希望这个解决方案能够帮到你。看起来有点过度,但是通过这样做,你就不必再担心事务了。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接