使用Java编写Lambda函数实现AWS S3事件通知

13
我想使用Lambda函数来处理S3 Put事件通知。我的Lambda函数应该在我将任何新的JSON文件放入/添加到S3存储桶中时被调用。 我的挑战是,没有足够的文档来实现Java中此类Lambda函数的实现。我找到的大多数文档都是针对Node.js的。
我希望我的Lambda函数被调用,然后在Lambda函数内部,我想要消耗那些已添加的json,然后将该JSON发送到AWS ES服务。
但是,我应该使用哪些类呢?有人有关于这个的任何想法吗? S3和ES都已设置和运行。Lambda的自动生成代码是`。
@Override
public Object handleRequest(S3Event input, Context context) {
    context.getLogger().log("Input: " + input);

    // TODO: implement your handler
    return null;
}

接下来呢??


1
好的,启动它!激活并上传文件。你需要的第一个“类”只是你的眼睛——看一下记录的输入。在那之后,其余的应该就很明显了。输入应该是一个S3事件通知,告诉你刚刚上传的对象的信息。 - Michael - sqlbot
2个回答

13

在Lambda中处理S3事件是可行的,但需要记住的是,S3Event对象仅传输到对象的引用而非对象本身。要获取实际对象,您必须自己调用AWS SDK。 在lambda函数中请求S3对象将如下所示:

public Object handleRequest(S3Event input, Context context) {
    AmazonS3Client s3Client = new AmazonS3Client(new DefaultAWSCredentialsProviderChain());        

    for (S3EventNotificationRecord record : input.getRecords()) {
        String s3Key = record.getS3().getObject().getKey();
        String s3Bucket = record.getS3().getBucket().getName();
        context.getLogger().log("found id: " + s3Bucket+" "+s3Key);
        // retrieve s3 object
        S3Object object = s3Client.getObject(new GetObjectRequest(s3Bucket, s3Key));
        InputStream objectData = object.getObjectContent();
        //insert object into elasticsearch
    }        
    return null;
}

现在比较困难的部分是将该对象插入ElasticSearch。遗憾的是,AWS SDK没有提供任何相关功能。默认方法是通过对AWS ES端点进行REST调用来实现。有许多示例可以帮助你了解如何调用Elasticsearch实例。

一些人似乎选择使用以下项目:

Jest - Elasticsearch Java Rest Client


谢谢Jens。我已经完成了简单部分,现在正在努力解决困难的部分。尝试只使用Jest将消息添加到ES中,但不知何故,好像那个部分缺失了。所有类似搜索、获取等操作都已显示,但是添加文档的部分却没有。下面这行代码对我来说报错了:DocumentMapper documentMapper = new DocumentMapper.Builder("groupId", null, rootObjectMapperBuilder).build(null); - NGR
1
说实话,我从未使用过DocumentMapper,因为这一步只需要执行一次,我通常通过Kibana GUI生成我的映射。除此之外,我只使用API将文档附加到索引中。 - jens walter
谢谢你的帮助 Jens。我使用了 Apache HTTP 来创建 REST API,并通过 Postman 创建了我的索引和映射。现在它可以正常工作了。 - NGR

7

最后,以下是使用Java进行S3 --> Lambda --> ES集成的步骤。

  1. Have your S3, Lamba and ES created on AWS. Steps are here.
  2. Use below Java code in your lambda function to fetch a newly added object in S3 and send it to ES service.

    public Object handleRequest(S3Event input, Context context) {
    AmazonS3Client s3Client = new AmazonS3Client(new DefaultAWSCredentialsProviderChain());        
    
    for (S3EventNotificationRecord record : input.getRecords()) {
        String s3Key = record.getS3().getObject().getKey();
        String s3Bucket = record.getS3().getBucket().getName();
        context.getLogger().log("found id: " + s3Bucket+" "+s3Key);
        // retrieve s3 object
        S3Object object = s3Client.getObject(new GetObjectRequest(s3Bucket, s3Key));
        InputStream objectData = object.getObjectContent();
    
        //Start putting your objects in AWS ES Service
        String esInput = "Build your JSON string here using S3 objectData";
    
        HttpClient httpClient = new DefaultHttpClient();
    
        HttpPut putRequest = new HttpPut(AWS_ES_ENDPOINT + "/{Index_name}/{product_name}/{unique_id}" );
    
        StringEntity input = new StringEntity(esInput);
        input.setContentType("application/json");
        putRequest.setEntity(input);
    
        httpClient.execute(putRequest);
        httpClient.getConnectionManager().shutdown();
    
    }        
    return "success";}
    
  3. Use either Postman or Sense to create Actual index & corresponding mapping in ES.

  4. Once done, download and run proxy.js on your machine. Make sure you setup ES Security steps suggested in this post

  5. Test setup and Kibana by running http://localhost:9200/_plugin/kibana/ URL from your machine.

  6. All is set. Go ahead and set your dashboard in Kibana. Test it by adding new objects in your S3 bucket


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接