异步Lambda函数:返回promise或发送responseURL不会终止CloudFormation自定义资源调用

17

我有一个通过CloudFormation模板作为自定义资源调用的lambda函数。它创建/删除AWS Connect实例。API调用正常,但我似乎无法终止自定义资源调用,因此最后一个CF块仍处于CREATE_IN_PROGRESS状态。无论我从异步函数返回什么,它都不会用成功来终止CF执行。

我能够成功使用非异步处理程序,例如https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/walkthrough-custom-resources-lambda-lookup-amiids.html,但我需要进行多个API调用并等待完成,因此需要使用异步处理程序。

以下是代码的最简形式,尽管我已经尝试了几乎一切,包括使用回调和上下文(即exports.handler = async function(event, context, callback) {...}),这两者与异步处理程序无关应该是不必要的。我尝试使用cfn-response直接发送响应,但似乎被异步处理程序忽略了。我尝试直接返回带或不带await的承诺,尝试返回包含各种responseStatus和responseData的变量,没有任何东西似乎起作用。

Transform: 'AWS::Serverless-2016-10-31'
Parameters:
  IdentityManagementType:
    Description: The type of identity management for your Amazon Connect users.
    Type: String
    AllowedValues: ["SAML", "CONNECT_MANAGED", "EXISTING_DIRECTORY"]
    Default: "SAML"
  InboundCallsEnabled:
    Description: Whether your contact center handles incoming contacts.
    Type: String
    AllowedValues: [true, false]
    Default: true
  InstanceAlias:
    Description: The name for your instance.
    Type: String
    MaxLength: 62
  OutboundCallsEnabled:
    Description: Whether your contact center allows outbound calls.
    Type: String
    AllowedValues: [true, false]
    Default: true
  DirectoryId:
    Description: Optional. The identifier for the directory, if using this type of Identity Management.
    Type: String
  ClientToken:
    Description: Optional. The idempotency token. Used for concurrent deployments
    Type: String
    MaxLength: 500
  Region:
    Description: Region to place the AWS Connect Instance
    Type: String
    Default: us-east-1
#Handler for optional values
Conditions:
  HasClientToken: !Not
    - !Equals
      - ""
      - !Ref ClientToken
  HasDirectoryId: !Not
    - !Equals
      - ""
      - !Ref DirectoryId

Resources:
  CreateConnectInstance:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: !Sub "${AWS::StackName}-AWSConnectInstance"
      Handler: index.handler
      Runtime: nodejs12.x
      Description: Invoke a function to create an AWS Connect instance.
      MemorySize: 128
      Timeout: 30
      Role: !GetAtt LambdaExecutionRole.Arn
      Layers:
        - !Sub "arn:aws:lambda:us-east-1:${AWS::AccountId}:layer:node_sdk:1"
      Environment:
        Variables:
          IdentityManagementType:
            Ref: IdentityManagementType
          InboundCallsEnabled:
            Ref: InboundCallsEnabled
          InstanceAlias:
            Ref: InstanceAlias
          OutboundCallsEnabled:
            Ref: OutboundCallsEnabled
          Region:
            Ref: Region
          #Optional Values
          ClientToken: !If
            - HasClientToken
            - !Ref ClientToken
            - !Ref "AWS::NoValue"
          DirectoryId: !If
            - HasClientToken
            - !Ref ClientToken
            - !Ref "AWS::NoValue"
      InlineCode: |
        var aws = require("aws-sdk");
        exports.handler = async function(event) {
            console.log("REQUEST RECEIVED:\n" + JSON.stringify(event));
            var connect = new aws.Connect({region: event.ResourceProperties.Region});
            var isInboundCallsEnabled = (process.env.InboundCallsEnabled == 'true');
            var isOutboundCallsEnabled = (process.env.OutboundCallsEnabled == 'true');
            var createInstanceParams = {
                InboundCallsEnabled: isInboundCallsEnabled,
                OutboundCallsEnabled: isOutboundCallsEnabled,
                IdentityManagementType: process.env.IdentityManagementType,
                ClientToken: process.env.ClientToken,
                DirectoryId: process.env.DirectoryId,
                InstanceAlias: process.env.InstanceAlias
            };

            // Create AWS Connect instance using specified parameters
            if (event.RequestType == "Create") {
                return await connect.createInstance(createInstanceParams).promise();
                // I can store this in a variable and read the contents fine, but...
                // returning the promise does not terminate execution
            }
        };


  InvokeCreateConnectInstance:
    Type: Custom::CreateConnectInstance
    Properties:
      ServiceToken: !GetAtt CreateConnectInstance.Arn
      Region: !Ref "AWS::Region"

https://docs.aws.amazon.com/lambda/latest/dg/nodejs-handler.html的文档中明确指出,您应该能够直接从任何异步函数返回await apiCall.promise(),这正是我要尝试的,例如:

const s3 = new AWS.S3()

exports.handler = async function(event) {
  return s3.listBuckets().promise()
}

为什么我的异步函数无法返回?API调用正常工作,Connect实例已创建和删除(尽管出于简洁起见,我省略了删除代码),但是CF仅挂起数小时直到最终显示“自定义资源未能在预期时间内稳定”。

以下是供易读性单独的行内代码:

        exports.handler = async function(event) {
            console.log("REQUEST RECEIVED:\n" + JSON.stringify(event));
            var connect = new aws.Connect({region: event.ResourceProperties.Region});
            var isInboundCallsEnabled = (process.env.InboundCallsEnabled == 'true');
            var isOutboundCallsEnabled = (process.env.OutboundCallsEnabled == 'true');
            var createInstanceParams = {
                InboundCallsEnabled: isInboundCallsEnabled,
                OutboundCallsEnabled: isOutboundCallsEnabled,
                IdentityManagementType: process.env.IdentityManagementType,
                ClientToken: process.env.ClientToken,
                DirectoryId: process.env.DirectoryId,
                InstanceAlias: process.env.InstanceAlias
            };

            // Create AWS Connect instance using specified parameters
            if (event.RequestType == "Create") {
                return await connect.createInstance(createInstanceParams).promise();
                // I can store this in a variable and read the contents fine, but...
                // returning the promise does not terminate CF execution
            }
          };

更新:我已经按照AMI查找示例(第一个链接)完全实现了sendResponse方法,并发送了完全正确的响应结构,甚至在数据字段中包括了新创建的连接实例ID:

{
    "Status": "SUCCESS",
    "Reason": "See the details in CloudWatch Log Stream: 2020/12/23/[$LATEST]6fef3553870b4fba90479a37b4360cee",
    "PhysicalResourceId": "2020/12/23/[$LATEST]6fef3553870b4fba90479a37b4360cee",
    "StackId": "arn:aws:cloudformation:us-east-1:642608065726:stack/cr12/1105a290-4534-11eb-a6de-0a8534d05dcd",
    "RequestId": "2f7c3d9e-941f-402c-b739-d2d965288cfe",
    "LogicalResourceId": "InvokeCreateConnectInstance",
    "Data": {
        "InstanceId": "2ca7aa49-9b20-4feb-8073-5f23d63e4cbc"
    }
}

尽管如此,CloudFormation中的自定义资源仍然无法关闭。 当我将上面的内容返回给event.responseURL时,我不明白为什么会发生这种情况。 就像指定异步处理程序完全破坏了自定义资源处理程序,并阻止其关闭。

更新:当我直接将上述响应手动CURL到event.responseUrl时,CF资源注册成功! 什么鬼...我正在发送与Lambda函数发送相同的响应,但它可以从CURL接受它,但无法从我的Lambda函数接受。

更新:最新代码包括sendResponse等

var aws = require("aws-sdk");
exports.handler = async function(event, context, callback) {
    console.log("REQUEST RECEIVED:\n" + JSON.stringify(event));
    var connect = new aws.Connect({region: event.ResourceProperties.Region});
    var isInboundCallsEnabled = (process.env.InboundCallsEnabled == 'true');
    var isOutboundCallsEnabled = (process.env.OutboundCallsEnabled == 'true');
    var createInstanceParams = {
        InboundCallsEnabled: isInboundCallsEnabled,
        OutboundCallsEnabled: isOutboundCallsEnabled,
        IdentityManagementType: process.env.IdentityManagementType,
        ClientToken: process.env.ClientToken,
        DirectoryId: process.env.DirectoryId,
        InstanceAlias: process.env.InstanceAlias
    };
    var responseStatus;
    var responseData = {};

    // Create Connect instance
    if (event.RequestType == "Create") {
        try {
            var createInstanceRequest = await connect.createInstance(createInstanceParams).promise();
            responseStatus = "SUCCESS";
            responseData = {"InstanceId": createInstanceRequest.Id};
        } catch (err) {
            responseStatus = "FAILED";
            responseData = {Error: "CreateInstance failed"};
            console.log(responseData.Error + ":\n", err);
        }
        sendResponse(event, context, responseStatus, responseData);
        return;
    }

    // Look up the ID and call deleteInstance.
    if (event.RequestType == "Delete") {
        var instanceId;
        var listInstanceRequest = await connect.listInstances({}).promise();
        listInstanceRequest.InstanceSummaryList.forEach(instance => {
            if (instance.InstanceAlias == createInstanceParams.InstanceAlias) {
                instanceId = instance.Id;
            }
        });
        if (instanceId !== undefined) {
            try {
                var deleteInstanceRequest = await connect.deleteInstance({"InstanceId": instanceId}).promise();
                responseStatus = "SUCCESS";
                responseData = {"InstanceId": instanceId};
            } catch (err) {
                responseStatus = "FAILED";
                responseData = {Error: "DeleteInstance call failed"};
                console.log(responseData.Error + ":\n", err);
            }
        } else {
            responseStatus = "FAILED";
            responseData = {Error: "DeleteInstance failed; no match found"};
            console.log(responseData.Error);
        }
        sendResponse(event, context, responseStatus, responseData);
        return;
    }
};

// Send response to the pre-signed S3 URL 
function sendResponse(event, context, responseStatus, responseData) {
    var responseBody = JSON.stringify({
        Status: responseStatus,
        Reason: "CloudWatch Log Stream: " + context.logStreamName,
        PhysicalResourceId: context.logStreamName,
        StackId: event.StackId,
        RequestId: event.RequestId,
        LogicalResourceId: event.LogicalResourceId,
        Data: responseData
    });
    console.log("RESPONSE BODY:\n", responseBody);
    var https = require("https");
    var url = require("url");
    var parsedUrl = url.parse(event.ResponseURL);
    var options = {
        hostname: parsedUrl.hostname,
        port: 443,
        path: parsedUrl.path,
        method: "PUT",
        headers: {
            "content-type": "",
            "content-length": responseBody.length
        }
    };
    console.log("SENDING RESPONSE...\n");
    var request = https.request(options, function(response) {
        console.log("STATUS: " + response.statusCode);
        console.log("HEADERS: " + JSON.stringify(response.headers));
        // Tell AWS Lambda that the function execution is done  
        context.done();
    });
    request.on("error", function(error) {
        console.log("sendResponse Error:" + error);
        // Tell AWS Lambda that the function execution is done  
        context.done();
    });
    // write data to request body
    request.write(responseBody);
    request.end();
}

这已经困扰我两天了 :(

在日志中,“RESPONSE BODY”显示如我所复制的一样,日志显示“SENDING RESPONSE”,但不会到达请求的“STATUS: ”和“HEADERS: ”部分。https()调用,这让我想到异步对此调用产生了干扰...我不知道。

3个回答

16

这个有点棘手,但我终于把所有的问题都解决了。我必须通过添加一个 promise 来将 sendResponse 函数变成异步函数,等待这个 promise 并返回它。这使我最终能够调用 "return await sendResponse(event, context, responseStatus, responseData);" ,最终一切都运行良好,创建和删除操作都成功,CloudFormation 自定义资源也按预期完成。哎呀。我希望通过在这里发布代码,让其他人从中受益。

var aws = require("aws-sdk");
exports.handler = async function(event, context, callback) {
    console.log("REQUEST RECEIVED:\n" + JSON.stringify(event));
    var connect = new aws.Connect({region: event.ResourceProperties.Region});
    var isInboundCallsEnabled = (process.env.InboundCallsEnabled == 'true');
    var isOutboundCallsEnabled = (process.env.OutboundCallsEnabled == 'true');
    var createInstanceParams = {
        InboundCallsEnabled: isInboundCallsEnabled,
        OutboundCallsEnabled: isOutboundCallsEnabled,
        IdentityManagementType: process.env.IdentityManagementType,
        ClientToken: process.env.ClientToken,
        DirectoryId: process.env.DirectoryId,
        InstanceAlias: process.env.InstanceAlias
    };
    var responseStatus;
    var responseData = {};
    if (event.RequestType == "Create") {
        try {
            var createInstanceRequest = await connect.createInstance(createInstanceParams).promise();
            responseStatus = "SUCCESS";
            responseData = {"InstanceId": createInstanceRequest.Id};
        } catch (err) {
            responseStatus = "FAILED";
            responseData = {Error: "CreateInstance failed"};
            console.log(responseData.Error + ":\n", err);
        }
        return await sendResponse(event, context, responseStatus, responseData);
    }

    if (event.RequestType == "Delete") {
        var instanceId;
        var listInstanceRequest = await connect.listInstances({}).promise();
        listInstanceRequest.InstanceSummaryList.forEach(instance => {
            if (instance.InstanceAlias == createInstanceParams.InstanceAlias) {
                instanceId = instance.Id;
            }
        });
        if (instanceId !== undefined) {
            try {
                var deleteInstanceRequest = await connect.deleteInstance({"InstanceId": instanceId}).promise();
                responseStatus = "SUCCESS";
                responseData = {"InstanceId": instanceId};
            } catch (err) {
                responseStatus = "FAILED";
                responseData = {Error: "DeleteInstance call failed"};
                console.log(responseData.Error + ":\n", err);
            }
        } else {
            responseStatus = "FAILED";
            responseData = {Error: "DeleteInstance failed; no match found"};
            console.log(responseData.Error);
        }
        return await sendResponse(event, context, responseStatus, responseData);
    }
};

async function sendResponse(event, context, responseStatus, responseData) {
    let responsePromise = new Promise((resolve, reject) => {
        var responseBody = JSON.stringify({
            Status: responseStatus,
            Reason: "CloudWatch Log Stream: " + context.logStreamName,
            PhysicalResourceId: context.logStreamName,
            StackId: event.StackId,
            RequestId: event.RequestId,
            LogicalResourceId: event.LogicalResourceId,
            Data: responseData
        });
        console.log("RESPONSE BODY:\n", responseBody);
        var https = require("https");
        var url = require("url");
        var parsedUrl = url.parse(event.ResponseURL);
        var options = {
            hostname: parsedUrl.hostname,
            port: 443,
            path: parsedUrl.path,
            method: "PUT",
            headers: {
                "content-type": "",
                "content-length": responseBody.length
            }
        };
        console.log("SENDING RESPONSE...\n");
        var request = https.request(options, function(response) {
            console.log("STATUS: " + response.statusCode);
            console.log("HEADERS: " + JSON.stringify(response.headers));
            resolve(JSON.parse(responseBody));
            context.done();
        });
        request.on("error", function(error) {
            console.log("sendResponse Error:" + error);
            reject(error);
            context.done();
        });
        request.write(responseBody);
        request.end();
    });
    return await responsePromise;
}

5
AWS应该发布有关如何从具有异步处理程序的自定义资源返回请求的文档,因为目前只涵盖同步情况。我肯定会将其用作未来需要异步的自定义资源的模板。 - user2774004
2
感谢您提供这个完整的示例以及您在处理它时的思考过程。 - John Greenfield
这很奇怪。看起来 AWS 错误地实现了 cfn-response,在不等待响应的情况下触发了 HTTPS 请求。为什么这适用于同步 Lambda 而不适用于异步 Lambda 是一个谜,但也许异步 Lambda 比同步 Lambda 更早终止,从而防止 HTTPS 请求被发送?如果有人从 AWS 能够调查、评论和修复他们对 cfn-response 的实现和相关文档,那将是很好的。 - Woodz
感谢您的回答,我很惊讶AWS文档如此糟糕。 - newprogrammer

4
这个答案是对原帖答案的变种,适用于在 CloudFormation 中使用 AWS::Lambda::Function 资源 的 "Code 属性" 中选择 "ZipFile" 选项的人。ZipFile 方法的优点是除了允许 Lambda 代码内联到 CF 模板中,它还自动捆绑了一个非常类似于 OP 答案中的 "async function sendResponse" 的 "cfn-response.js" 函数。在从 OP 答案中获取有关承诺响应的见解后(谢谢,我陷入了困境和困惑),这是我如何将 cfn-response 函数作为可等待的 Promise 合并到 CF 中,在我的异步 AWS API 调用(为简洁起见省略)完成后向 CF 发出信号:
CreateSnapshotFunction:
    Type: AWS::Lambda::Function
    Properties:
        Runtime: nodejs12.x
        Handler: index.handler
        Timeout: 900 # 15 mins
        Code:
            ZipFile: !Sub |
                const resp = require('cfn-response');
                const aws = require('aws-sdk');
                const cf = new aws.CloudFormation({apiVersion: '2010-05-15'});
                const rds = new aws.RDS({apiVersion: '2014-10-31'});

                exports.handler = async function(evt, ctx) {
                    if (evt.RequestType == "Create") {
                        try {
                            // Query the given CF stack, determine its database
                            // identifier, create a snapshot of the database,
                            // and await an "available" status for the snapshot
                            let stack = await getStack(stackNameSrc);
                            let srcSnap = await createSnapshot(stack);
                            let pollFn = () => describeSnapshot(srcSnap.DBSnapshot.DBSnapshotIdentifier);
                            let continueFn = snap => snap.DBSnapshots[0].Status !== 'available';
                            await poll(pollFn, continueFn, 10, 89); // timeout after 14 min, 50 sec

                            // Send response to CF
                            await send(evt, ctx, resp.SUCCESS, {
                                SnapshotId: srcSnap.DBSnapshot.DBSnapshotIdentifier,
                                UpgradeRequired: upgradeRequired
                            });
                        } catch(err) {
                            await send(evt, ctx, resp.FAILED, { ErrorMessage: err } );
                        }
                    } else {
                        // Send success to CF for delete and update requests
                        await send(evt, ctx, resp.SUCCESS, {});
                    }
                };

                function send(evt, ctx, status, data) {
                    return new Promise(() => { resp.send(evt, ctx, status, data) });
                }

0

通过CFN自定义资源创建和删除S3对象

Transform: 'AWS::Serverless-2016-10-31'

Resources:
    CustomResourceLambdaExecutionRole:
        Type: 'AWS::IAM::Role'
        Properties:
            AssumeRolePolicyDocument:
                Version: 2012-10-17
                Statement:
                    - Effect: Allow
                      Principal:
                          Service: lambda.amazonaws.com
                      Action:
                          - 'sts:AssumeRole'
            ManagedPolicyArns:
              - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
            Policies:
                - PolicyName: CustomS3Access
                  PolicyDocument:
                      Version: 2012-10-17
                      Statement:
                          - Effect: Allow
                            Action:
                                - logs:CreateLogGroup
                                - logs:CreateLogStream
                                - logs:PutLogEvents
                            Resource: '*'
                - PolicyName: S3Policy
                  PolicyDocument:
                      Version: 2012-10-17
                      Statement:
                          - Effect: Allow
                            Action:
                                - s3:List*
                                - s3:DeleteObject
                            Resource: '*'

    CustomResourceLambdaFunction:
        Type: AWS::Serverless::Function
        Properties:    
            InlineCode: |
                const AWS = require('aws-sdk');
                const S3Client = new AWS.S3({apiVersion: '2006-03-01', region:  "us-east-1"});

                exports.handler = async (event, context) => {
                    const { RequestType, ResourceProperties  } = event;

                    let responseData = {};
                    let noOfObjects;
                    responseData['Message'] = "Defaut setting";
                    const bucketParams = { Bucket: ResourceProperties.s3bucketName };

                    if(RequestType == 'Create'){
                        responseData['Message'] = "Resource creation successful!";
                    }
                    else if(RequestType == 'Update'){
                        responseData['Message'] = "Resource update successful!";
                    }
                    else if(RequestType == 'Delete'){
                        const data = await S3Client.listObjects(bucketParams).promise()

                        noOfObjects = data.Contents;                            
                        for (let i = 0; i < noOfObjects.length; i++) {
                            const res = await S3Client.deleteObject({
                                    Bucket: bucketParams.Bucket,
                                    Key: noOfObjects[i].Key,
                                    }).promise()
                        };
                        responseData['Message'] = "Resource deletion successful!"
                        return await sendResponse(event, context, "SUCCESS", responseData)
                      }
                      return await sendResponse(event, context, "SUCCESS", responseData);                      
                };

                async function sendResponse(event, context, responseStatus, responseData) {
                    let responsePromise = new Promise((resolve, reject) => {
                        var responseBody = JSON.stringify({
                            Status: responseStatus,
                            Reason: "CloudWatch Log Stream: " + context.logStreamName,
                            PhysicalResourceId: context.logStreamName,
                            StackId: event.StackId,
                            RequestId: event.RequestId,
                            LogicalResourceId: event.LogicalResourceId,
                            Data: responseData
                        });
                        console.log("RESPONSE BODY:\n", responseBody);
                        var https = require("https");
                        var url = require("url");
                        var parsedUrl = url.parse(event.ResponseURL);
                        var options = {
                            hostname: parsedUrl.hostname,
                            port: 443,
                            path: parsedUrl.path,
                            method: "PUT",
                            headers: {
                                "content-type": "",
                                "content-length": responseBody.length
                            }
                        };
                        console.log("SENDING RESPONSE...\n");
                        var request = https.request(options, function(response) {
                            console.log("STATUS: " + response.statusCode);
                            console.log("HEADERS: " + JSON.stringify(response.headers));
                            resolve(JSON.parse(responseBody));
                            context.done();
                        });
                        request.on("error", function(error) {
                            console.log("sendResponse Error:" + error);
                            reject(error);
                            context.done();
                        });
                        request.write(responseBody);
                        request.end();
                    });
                    return await responsePromise;
                }

            Timeout: 20
            Handler: index.handler
            Runtime: nodejs12.x    
            Role: !GetAtt CustomResourceLambdaExecutionRole.Arn

    S3Bucket:
      Type: AWS::S3::Bucket
      Properties:
        BucketName: yaya-test-v50final

    CustomResource:
      Type: Custom::CustomResource
      Properties:
          ServiceToken: !GetAtt CustomResourceLambdaFunction.Arn
          s3bucketName: !Ref S3Bucket

为了改进这个答案(并获得一些赞同票),你能否提供一些解释,说明为什么这个解决方案有效,或者与之前的答案有何不同? - brasskazoo

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接