我有一个Python Lambda函数,可以响应物联网按钮和Alexa技能。
是否有一种方法可以读取传递给处理程序函数的
是否有一种方法可以读取传递给处理程序函数的
event
或 context
来识别触发函数的服务(Alexa或IoT)? event
或 context
来识别触发函数的服务(Alexa或IoT)?希望AWS现在已经改进了这一点,但遗憾的是还没有。在所有AWS服务中,没有一个单一的参数可以用来确定事件类型。
不过,在网上找到了这篇不错的文章(链接)
function getLambdaEventSource(event) {
if (event.Records && event.Records[0].cf) return 'isCloudfront';
if (event.configRuleId && event.configRuleName && event.configRuleArn) return 'isAwsConfig';
if (event.Records && (event.Records[0].eventSource === 'aws:codecommit')) return 'isCodeCommit';
if (event.authorizationToken === "incoming-client-token") return 'isApiGatewayAuthorizer';
if (event.StackId && event.RequestType && event.ResourceType) return 'isCloudFormation';
if (event.Records && (event.Records[0].eventSource === 'aws:ses')) return 'isSes';
if (event.pathParameters && event.pathParameters.proxy) return 'isApiGatewayAwsProxy';
if (event.source === 'aws.events') return 'isScheduledEvent';
if (event.awslogs && event.awslogs.data) return 'isCloudWatchLogs';
if (event.Records && (event.Records[0].EventSource === 'aws:sns')) return 'isSns';
if (event.Records && (event.Records[0].eventSource === 'aws:dynamodb')) return 'isDynamoDb';
if (event.records && event.records[0].approximateArrivalTimestamp) return 'isKinesisFirehose';
if (event.records && event.deliveryStreamArn && event.deliveryStreamArn.startsWith('arn:aws:kinesis:')) return 'isKinesisFirehose';
if (event.eventType === 'SyncTrigger' && event.identityId && event.identityPoolId) return 'isCognitoSyncTrigger';
if (event.Records && event.Records[0].eventSource === 'aws:kinesis') return 'isKinesis';
if (event.Records && event.Records[0].eventSource === 'aws:s3') return 'isS3';
if (event.operation && event.message) return 'isMobileBackend';
}
无法可靠地完成此操作。你能做的最接近的方法是熟悉不同服务生成的各种事件内容,并(希望)在你感兴趣的每个系列中识别出一个可靠的唯一键,然后在代码中检查它,例如使用
if 'distinctKey' in event.keys():
# ...
然而,这几乎不是一种可靠的方法,因为它要求您:
针对Python开发人员:您可以查看此链接https://gist.github.com/Necromancerx/abed07138690d37d170a6cf15b40d749。
def get_lambda_event_source(self, event: dict):
if 'pathParameters' in event and 'proxy' in event['pathParameters']:
return 'api_gateway_aws_proxy'
elif 'Records' in event and len(event['Records']) > 0 and 'eventSource' in event['Records'][0] and event['Records'][0]['eventSource'] == 'aws:s3':
return 's3'
elif 'Records' in event and len(event['Records']) > 0 and 'EventSource' in event['Records'][0] and event['Records'][0]['EventSource'] == 'aws:sns':
return 'sns'
elif 'Records' in event and len(event['Records']) > 0 and 'eventSource' in event['Records'][0] and event['Records'][0]['eventSource'] == 'aws:dynamodb':
return 'dynamo_db'
elif 'Records' in event and len(event['Records']) > 0 and 'cf' in event['Records'][0]:
return 'cloudfront'
elif 'source' in event and event['source'] == 'aws.events':
return 'scheduled_event'
elif 'awslogs' in event and 'data' in event['awslogs']:
return 'cloud_watch_logs'
elif 'authorizationToken' in event and event['authorizationToken'] == "incoming-client-token":
return 'api_gateway_authorizer'
elif 'configRuleId' in event and 'configRuleName' in event and 'configRuleArn' in event:
return 'aws_config'
elif 'StackId' in event and 'RequestType' in event and 'ResourceType' in event:
return 'cloud_formation'
elif 'Records' in event and len(event['Records']) > 0 and 'eventSource' in event['Records'][0] and event['Records'][0]['eventSource'] == 'aws:codecommit':
return 'code_commit'
elif 'Records' in event and len(event['Records']) > 0 and 'eventSource' in event['Records'][0] and event['Records'][0]['eventSource'] == 'aws:ses':
return 'ses'
elif 'Records' in event and len(event['Records']) > 0 and 'eventSource' in event['Records'][0] and event['Records'][0]['eventSource'] == 'aws:kinesis':
return 'kinesis'
elif 'records' in event and len(event['Records']) > 0 and 'approximateArrivalTimestamp' in event['records'][0]:
return 'kinesis_firehose'
elif 'records' in event and len(event['Records']) > 0 and 'deliveryStreamArn' in event and event['deliveryStreamArn'] is str and event['deliveryStreamArn'].startswith('arn:aws:kinesis:'):
return 'kinesis_firehose'
elif 'eventType' in event and event['eventType'] == 'SyncTrigger' and 'identityId' in event and 'identityPoolId' in event:
return 'cognito_sync_trigger'
elif 'operation' in event and 'message' in event:
return 'is_mobile_backend'
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def my_logging_handler(event, context):
logger.info('got event{}'.format(event))
这将在云监视器中记录事件数据,以便您可以确定触发 Lambda 的事件
event
或context
的哪个部分,可以用来确定是什么衍生(IoT或ASK)调用了该函数。或者,如果(并且只有在)这不可能,那么我如何检查其他某些来源(例如日志)来确定呢? - orome