模拟AWS服务和Lambda最佳实践

5
我正在开发一个简单的AWS Lambda函数,它由DynamoDB Streams事件触发,并应将除“REMOVE”事件外的所有记录转发到SQS队列。该函数按预期工作,没有任何意外。
我想编写一个单元测试来测试不提交任何内容到SQS时的行为,当它是一个“DELETE”事件时。我首先尝试使用“aws-sdk-mock”来实现这一点。正如您在函数代码中所看到的那样,我尝试遵循Lambda最佳实践,通过在处理程序代码之外初始化SQS客户端。显然,这会防止“aws-sdk-mock”能够模拟SQS服务(GitHub上有一个问题:https://github.com/dwyl/aws-sdk-mock/issues/206)。
然后我尝试使用“jest”来模拟SQS,这需要更多的代码才能做到正确,但我最终遇到了同样的问题,被要求将SQS的初始化放在处理程序函数内部,这违反了Lambda最佳实践。
我该如何编写一个单元测试来测试这个函数,同时让SQS客户端的初始化(const sqs: SQS = new SQS())在处理程序之外进行?我是错误地模拟了服务还是处理程序的结构必须改变才能更轻松地进行测试?我知道这个Lambda函数非常简单,可能不需要进行单元测试,但我将不得不编写更复杂逻辑的其他Lambda函数,并且我认为这个函数非常适合演示问题。

index.ts

import {DynamoDBStreamEvent, DynamoDBStreamHandler} from "aws-lambda";
import SQS = require("aws-sdk/clients/sqs");
import DynamoDB = require("aws-sdk/clients/dynamodb");

const sqs: SQS = new SQS()

export const handleDynamoDbEvent: DynamoDBStreamHandler = async (event: DynamoDBStreamEvent, context, callback) => {
    const QUEUE_URL = process.env.TARGET_QUEUE_URL
    if (QUEUE_URL.length == 0) {
        throw new Error('TARGET_QUEUE_URL not set or empty')
    }
    await Promise.all(
        event.Records
            .filter(_ => _.eventName !== "REMOVE")
            .map((record) => {
                const unmarshalled = DynamoDB.Converter.unmarshall(record.dynamodb.NewImage);
                let request: SQS.SendMessageRequest = {
                    MessageAttributes: {
                        "EVENT_NAME": {
                            DataType: "String",
                            StringValue: record.eventName
                        }
                    },
                    MessageBody: JSON.stringify(unmarshalled),
                    QueueUrl: QUEUE_URL,
                }
                return sqs.sendMessage(request).promise()
            })
    );
}

index.spec.ts

import {DynamoDBRecord, DynamoDBStreamEvent, StreamRecord} from "aws-lambda";
import {AttributeValue} from "aws-lambda/trigger/dynamodb-stream";
import {handleDynamoDbEvent} from "./index";
import {AWSError} from "aws-sdk/lib/error";
import {PromiseResult, Request} from "aws-sdk/lib/request";
import * as SQS from "aws-sdk/clients/sqs";
import {mocked} from "ts-jest/utils";
import DynamoDB = require("aws-sdk/clients/dynamodb");


jest.mock('aws-sdk/clients/sqs', () => {
    return jest.fn().mockImplementation(() => {
        return {
            sendMessage: (params: SQS.Types.SendMessageRequest, callback?: (err: AWSError, data: SQS.Types.SendMessageResult) => void): Request<SQS.Types.SendMessageResult, AWSError> => {
                // @ts-ignore
                const Mock = jest.fn<Request<SQS.Types.SendMessageResult, AWSError>>(()=>{
                    return {
                        promise: (): Promise<PromiseResult<SQS.Types.SendMessageResult, AWSError>> => {
                            return new Promise<PromiseResult<SQS.SendMessageResult, AWSError>>(resolve => {
                                resolve(null)
                            })
                        }
                    }
                })
                return new Mock()
            }
        }
    })
});


describe.only('Handler test', () => {

    const mockedSqs = mocked(SQS, true)

    process.env.TARGET_QUEUE_URL = 'test'
    const OLD_ENV = process.env;

    beforeEach(() => {
        mockedSqs.mockClear()
        jest.resetModules();
        process.env = {...OLD_ENV};
    });

    it('should write INSERT events to SQS', async () => {
        console.log('Starting test')
        await handleDynamoDbEvent(createEvent(), null, null)
        expect(mockedSqs).toHaveBeenCalledTimes(1)
    });
})
2个回答

1

我会这样处理:

  • 在主函数中,不会进行实际的SQS发送/操作,而是创建一个消息客户端接口,类似于以下内容:
interface QueueClient {
    send(eventName: string, body: string): Promise<any>;
}
  • 创建一个实际的类来实现该接口,以便与SQS进行交互:
class SQSQueueClient implements QueueClient {
    queueUrl: string
    sqs: SQS

    constructor() {
        this.queueUrl = process.env.TARGET_QUEUE_URL;
        if (this.queueUrl.length == 0) {
            throw new Error('TARGET_QUEUE_URL not set or empty')
        }
        this.sqs = new SQS();
    }

    send(eventName: string, body: string): Promise<any> {
        let request: SQS.SendMessageRequest = {
            MessageAttributes: {
                "EVENT_NAME": {
                    DataType: "String",
                    StringValue: eventName
                }
            },
            MessageBody: body,
            QueueUrl: this.queueUrl,
        }
        return this.sqs.sendMessage()
    }
}

这个类了解如何将数据转换为SQS格式的详细信息。
我会将主函数分为两部分。入口点只解析队列URL,创建一个实际的SQS队列客户端实例并调用process()。主要逻辑在process()中。
const queueClient = new SQSQueueClient();

export const handleDynamoDbEvent: DynamoDBStreamHandler = async (event: DynamoDBStreamEvent, context, callback) => {
    return process(queueClient, event);
}

export const process = async (queueClient: QueueClient, event: DynamoDBStreamEvent) => {
    return await Promise.all(
        event.Records
            .filter(_ => _.eventName !== "REMOVE")
            .map((record) => {
                const unmarshalled = DynamoDB.Converter.unmarshall(record.dynamodb.NewImage);
                return queueClient.send(record.eventName, JSON.stringify(unmarshalled));
            })
    );
}
  • 现在测试process()中的主要逻辑要容易得多。您可以通过手写一个实现QueueClient接口的模拟实例,或使用您喜欢的任何模拟框架来提供一个模拟实例。
  • 对于SQSQueueClient类,单元测试的好处不是很大,因此我将更多地依赖于集成测试(例如使用类似localstack的东西)。

我现在没有实际的IDE,所以如果有语法错误,请原谅。


1
在这个例子中,每次调用处理程序时都会调用new SQSQueueClient(QUEUE_URL),这将导致构造函数中的new SQS()被调用。这与Lambda最佳实践不符。 - mheck
在这种情况下,SQSQueueClient 的创建可以移至外部,队列 URL 可能需要作为参数传递给 send()。但是,想法可能仍然相同。 - Phuong Nguyen
@mheck 我刚刚修改了代码,将SQSQueueClient的创建移动到外部。从环境变量解析队列URL的逻辑也可以移到SQSQueueClient的构造函数中。 - Phuong Nguyen
你基本上是在实现依赖注入。顶层处理程序函数会解决依赖关系并将它们作为参数注入到实际的处理程序实现中。我认为,即使没有QueueClient接口,我也可以这样做。模拟SQS不是问题,但让处理程序使用它确实是个问题。这样,应该就能行了。 - mheck

1
我添加了一个初始化方法,该方法从处理程序函数内部调用。如果它之前已被调用过,则立即返回,并且否则将初始化SQS客户端。它可以轻松地扩展到初始化其他客户端。
这符合Lambda最佳实践,使测试代码能够正常工作。
let sqs: SQS = null
let initialized = false

export const handleDynamoDbEvent: DynamoDBStreamHandler = async (event: DynamoDBStreamEvent, context, callback) => {
    init()
    const QUEUE_URL = process.env.TARGET_QUEUE_URL
    if (QUEUE_URL.length == 0) {
        throw new Error('TARGET_QUEUE_URL not set or empty')
    }
    await Promise.all(
        event.Records
            .filter(_ => _.eventName !== "REMOVE")
            .map((record) => {
                const unmarshalled = DynamoDB.Converter.unmarshall(record.dynamodb.NewImage);
                let request: SQS.SendMessageRequest = {
                    MessageAttributes: {
                        "EVENT_NAME": {
                            DataType: "String",
                            StringValue: record.eventName
                        }
                    },
                    MessageBody: JSON.stringify(unmarshalled),
                    QueueUrl: QUEUE_URL,
                }
                return sqs.sendMessage(request).promise()
            })
    );
}

function init() {
    if (initialized) {
        return
    }
    console.log('Initializing...')
    initialized = true
    sqs = new SQS()
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接