如何在AWS Lambda中使用Golang支持多个触发器?

14

我正在使用Golang构建一个AWS Lambda函数,用于将n到m个S3存储桶中的内容复制。有一个要求是支持S3触发器以及从SQS获取数据,在那里存储了所有源S3存储桶的更改。代码可以在这里找到:https://github.com/maknahar/s3copy

我尝试了以下方法:

func main() {
    lambda.Start(ProcessIncomingS3Events)
    lambda.Start(ProcessIncomingEvents)
}

func ProcessIncomingS3Events(event events.S3Event) error {
    ...
    log.Println("Got S3 Event")
    return processS3Trigger(config, event)
}

func ProcessIncomingEvents() error {
    ...
    log.Println("Defaulting to SQS")
    return processSQSMessage(config)
}
在这种情况下,每次都会触发第一个事件ProcessIncomingS3Events
我也尝试了以下操作。
func main() {
    lambda.Start(ProcessIncomingEvents)
}

func ProcessIncomingEvents(event interface{}) error {
    ...
    switch request := event.(type) {
    case events.S3Event:
        log.Println("Got S3 Event")
        return processS3Trigger(config, request)

    case types.Nil:
        log.Println("Defaulting to SQS")
        return processSQSMessage(config)

    default:
        log.Println("Could not find the event type")

    }

    return nil
}
在这种情况下,Lambda无法检测到类型,并且在每个触发器中记录了“Could not find the event type”。 是否有一种方式通过AWS SDK支持函数的多个触发器?

为什么不为不同的事件类型分离Lambda函数? - raevilman
创建不同的 Lambda 函数需要维护同样数量的函数,而且它们都做同样的工作。目前,我没有设置 CI 进行部署,所以更改代码或配置将需要为所有函数进行手动设置。 - Mayank Patel
另一个原因可能是由于 Lambda 函数是 VPC 的一部分。为了避免产生 VPC 冷启动延迟,最好有一个定期的 ping 事件来保持函数的热度,同时处理我们实际想要处理的任何类型的事件。 - Harris Lummis
5个回答

6

通过实现 AWS Handler 接口,我成功监听了多个事件,它定义了一个方法:

Invoke(ctx context.Context, payload []byte) ([]byte, error)

我按照以下方式实现了一个多事件处理程序
type Handler struct {
//add global variables or context information that your handler may need
}

func (h Handler) Invoke(ctx context.Context, data []byte) ([]byte, error) {
  //for demonstration purposes, not the best way to handle
  apiGatewayEvent := new(events.APIGatewayProxyRequest)
  if err := json.Unmarshal(data, apiGatewayEvent); err != nil {
    log.Println("Not a api gateway event")
  }
  snsEvent := new(events.SNSEvent)
  if err := json.Unmarshal(data, snsEvent); err != nil {
    log.Println("Not a sns event")
  }
  return nil, nil
}

func main() {
  lambda.StartHandler(Handler{})
}

正如您所看到的,您可以获取任何事件的原始字节并根据需要处理它们,从而使您有可能使用相同的 Lambda 监听任何 AWS 事件。然而,在使用此方法之前,请仔细考虑,因为正如上面所指出的那样,Lambda 最好只用于处理一种类型的事件。


2
无法工作。我们遇到了错误:json:无法将对象取消编组为类型为[]uint8的Go值:UnmarshalTypeError null,而bytes []表示正确的对象:S3Event。 - chendu

2
您可以配置多个事件源以触发一个或多个Lambda函数。
然而,在Go语言中,lambda.Start调用是阻塞的,因此编写一个处理多个事件类型的单个函数并不容易。强烈建议您为每个事件源创建一个单独的Lambda函数。
Go的惯用解决方案是在主包中定义函数逻辑,并编写多个程序来获取源事件并调用函数。因此,项目布局将如下:
  • s3copy/main.go
  • s3copy/handlers/s3/main.go
  • s3copy/handlers/sqs/main.go
请参见我的Lambda和Go应用程序样板,了解示例项目布局和Makefile。

1
我不知道你是否已经找到解决方案,但我已经找到了解决方案。
在这里。。
package main

import (
    "errors"
    "github.com/aws/aws-lambda-go/events"
    "github.com/aws/aws-lambda-go/lambda"
)

func main() {
    lambda.Start(Handler)
}

type Event1 struct {
    //Event Attributes
}

type Event2 struct {
    //Event Attributes
}
type Composite struct {
    *Event1
    *Event2
    *events.S3Event
}

func Handler(c Composite) error {
    if c.Event1 != nil {
        //go along with code based on Event1
    } else if c.Event2 != nil {
        //go along with code based on Event2
    } else if c.S3Event != nil {
        //go along with code based on S3Event
    } else {
        return errors.New("wrong event type")
    }
    return nil
}

这将会很好地工作,您可以看到无论哪个事件被触发,该结构体都不会为空,并且您可以通过在Composite结构体中添加它们的指针来轻松添加额外的事件,就像我添加了*Event1*Event2一样。

有一个缺点是您不能将SQSEvent和S3Event放在一起,因为它们的属性具有相同的名称和json标记Record。 此外,如果两个事件具有相同的属性,那么这也会产生问题,因为它将在不同结构中相同的所有属性中放置零值

还有一种解决方案可以克服上述问题。

package main

import (
    "encoding/json"
    "errors"
    "github.com/aws/aws-lambda-go/events"
    "github.com/aws/aws-lambda-go/lambda"
)

func main() {
    lambda.Start(Handler)
}

type CustomEvent struct {
    //Give EventSource "aws:manual" value to determine event is manual
    EventSource string `json:"eventSource"`

    //Other CustomEvent Attributes
}

func Handler(event map[string]interface{}) error {
    //marshal map to get json of data
    eventMarshaled, err := json.Marshal(event)
    if err != nil {
        return err
    }

    //Flag for determining if any one of the event is parsed

    var eventParsed bool

    //declare CustomEvent variable to use while Unmarshaling
    var c CustomEvent
    if err := json.Unmarshal(eventMarshaled, &c); err == nil {
        if c.EventSource == "aws:manual" {
            eventParsed = true

            //your code based on CustomEvent
        }
        return nil
    }
    if !eventParsed {
        //declare SQSEvent variable to use while Unmarshaling
        var sqsEvent events.SQSEvent
        if err := json.Unmarshal(eventMarshaled, &sqsEvent); err == nil {
            for _, message := range sqsEvent.Records {
                if message.EventSource == "aws:sqs" {
                    eventParsed = true
                    //Your Code based on sqs event
                }
            }
            return nil
        }
    }

    if !eventParsed {
        //declare S3Event variable to use while Unmarshaling
        var s3Event events.S3Event
        if err := json.Unmarshal(eventMarshaled, &s3Event); err == nil {
            for _, message := range s3Event.Records {
                if message.EventSource == "aws:sqs" {
                    eventParsed = true
                    //Your Code based on s3 event
                }
            }
            return nil
        }
    }

    return errors.New("wrong event type")
}

正如您所看到的,处理程序默认从aws sdk获取map[string]interface{},您只需要将其编组然后解组为所需的类型。您必须为所有自定义事件提供eventSource属性,并检查它们的eventSource,以便可以根据此执行操作。 此外,在发送除aws特定事件(如SQS、SNS、S3等)之外的手动事件时,请不要忘记发送eventSource。您可以在上面的示例中看到如何检查SQS和S3的eventSource。您可以在aws提供的大多数事件中找到eventSource属性,但我不确定,但是您可以轻松地找到每个aws提供的事件中的一些唯一属性,这些属性可以用于检查nil,然后您可以确定它是哪种类型的事件。 这种方法修复了第一种解决方案中的问题,但也添加了不必要的json编组和解组,而且还有不同的方法来检查不同的事件,但我认为没有百分之百的方法来做到这一点。


0

您可以使用Go语言中的嵌入来解决这个问题:

import (
    "github.com/aws/aws-lambda-go/events"
    "github.com/aws/aws-lambda-go/lambda"
    "reflect"
)

type Event struct {
    events.SQSEvent
    events.APIGatewayProxyRequest
    //other event type
}

type Response struct {
    events.SQSEventResponse `json:",omitempty"`
    events.APIGatewayProxyResponse `json:",omitempty"`
   //other response type
}

func main() {
    lambda.Start(eventRouter)
}

func eventRouter(event Event) (Response, error) {
    var response Response
    switch {
    case reflect.DeepEqual(event.APIGatewayProxyRequest, events.APIGatewayProxyRequest{}):
        response.SQSEventResponse = sqsEventHandler(event.SQSEvent)
    case reflect.DeepEqual(event.SQSEvent, events.SQSEvent{}):
        response.APIGatewayProxyResponse = apiGatewayEventHandler(event.APIGatewayProxyRequest)
  //another case for a event handler
    }
    return response, nil
}

func sqsEventHandler(sqsEvent events.SQSEvent) events.SQSEventResponse {
    //do something with the SQS event 
}

func apiGatewayEventHandler(apiEvent events.APIGatewayProxyRequest) events.APIGatewayProxyResponse {
    //do something with the API Gateway event
}

注意:如果基本事件具有相同的字段名称,则需要寻找另一个DeepEqual比较方法实例。

第一篇帖子,我希望能帮助到某人。 - Agu-GC

0
在第一种方法中,您直接在第一条语句中调用ProcessIncomingS3Events,因此每次都会调用它。
阅读this - Lambda函数处理程序(Go)
在上面的链接中,作者正在解析事件的名称字段。同样,您可以检查任何始终存在于S3事件中的字段,例如“eventSource”:“aws:s3”(S3事件结构在here中查看)
如果存在,则为S3事件,否则为其他事件。或者您还可以检查SQS事件的字段。

我实际上正在寻找一种方式,使SDK提供添加多个触发器的功能。在我的当前用例中,有可能添加更多的触发器。因此,我不能像您提供的链接示例中那样硬编码一个事件类型。然而,您提供的链接帮助我了解可以将什么作为输入事件,并感谢您的帮助。 - Mayank Patel
1
Lambda SDK 尝试将事件(来自源的 JSON)直接映射到 Lambda 处理程序中指定的类型。因此,从 SDK 中不可能实现。您可以编写自己的实用程序库(包装器)来执行此操作,并随着未来的更多要求进行改进。例如,在我的一个项目中,需要维护跟踪目的的状态。因此,我编写了一个具有所需功能的超级 Lambda,现在所有其他 Lambda 都是从那个 Lambda 扩展而来的。您可以考虑拥有一个面向 Lambda,仅将字符串作为事件,并执行答案中建议的所有检查,然后将其转发到正确的处理程序。希望这有所帮助。 - raevilman

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接