Lambda - 从S3导入CSV到RDS MySQL

8

我有一个Lambda函数,它从S3导入特定的CSV文件到MySQL。然而,CSV文件的大小约为1 GB。当我运行此代码时,它不能处理并超时。

//s3 to rds
const fs = require("fs");
const AWS = require('aws-sdk');
var mysql = require('mysql');
var config = require('./config.json');
const s3 = new AWS.S3({
  accessKeyId: 'XXXXXXXXXXXXXXX',
  secretAccessKey: 'XXXXXXXXXXXXXXXXXXXXXXXXXXxx'
});
var filePath = `localfilepath`;

var pool = mysql.createPool({
  host: config.dbhost,
  user: config.dbuser,
  password: config.dbpassword,
  database: config.dbname
});
pool.getConnection((err, connection) => {
  if (err) throw err;
  console.log("Connected!" + connection);

  var s3Params = {
    Bucket: '<your_bucket_name>',
    Key: '<your_key>'
  };
  s3.getObject(s3Params, function(err, result) {
    if (err) {
      throw new Error(err);
    } else {
      console.log('file stored successfully', result);
      fs.createWriteStream(filePath).write(result.Body);
      connection.query('TRUNCATE TABLE <table_name>', (err, result) => {
        if (err) {
         throw new Error(err);
        } else {
          console.log('table truncated');
          var query = `LOAD DATA LOCAL INFILE '<file_name>' INTO table <table_name> FIELDS TERMINATED BY ','  ENCLOSED BY '"' IGNORE 1 LINES `;
          connection.query(query, function(err, result) {
            if (err) throw err;
            console.log("Result: " + result);
            connection.release();
            fs.unlinkSync(filePath);
            console.log('file deleted');
          });
        }
      });
    }

  });
})

我该如何让它运行起来?


  1. 我并没有看到你实际上将文件保存在任何地方。它似乎是加载到内存中的。
  2. 在Lambda运行时环境中用于保存文件的总空间为半GB,因此您的文件太大无法与AWS Lambda一起使用。
- Mark B
我有哪些选项可以执行这个程序? - dang
在上传到S3之前将文件分割成较小的文件,或者使用ECS、EKS或EC2来运行导入而不是Lambda。 - Mark B
能否使用Lambda分割文件?恐怕我不能使用EC2或任何其他服务,只能用Lambda。 - dang
2
你可以尝试流式传输对象并分段写入。请参阅此处的“检索对象的字节范围”示例:https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/S3.html#getObject-property - Mark B
3个回答

3
根据讨论串,他们确实希望在某个时候实现,但是何时实现是一个最佳猜测的情况。
目前AWS Lambda在/tmp目录中有512MB的硬盘空间限制(如此处所述),因此fs.createWriteStream(filePath).write(result.Body);行在这里不应该工作,因为文件大小为1GB。错误将类似于"no space left on device"(从现有线程回顾中得知)。
然而,在这种情况下,从S3加载文件应该可以工作。Lambda在内存和CPU大小方面均按比例扩展,因此在这里可能会由于缺乏内存而超时(取决于您设置了什么)。此链接提供了一个很好的指示,告诉您需要设置什么(与您正在加载到内存与磁盘空间相比)。
我建议在此阶段将流拆分为512MB块(软件包可能有所帮助),并将它们分别存储在S3中,这样您就可以将此操作拆分为2个函数:
1.提取数据并在S3文件中进行拆分(还要截断您的表)。 2.从S3加载CSV数据到您的RDS中。
(您可以使用Cloudwatch Events来完成此操作)

你有可以供我参考的代码示例吗? - dang

2
您基本上有两个障碍需要克服:1)Lambda上的本地存储仅为512mb,2)Lambda具有15分钟的执行时间限制(您必须在函数上显式配置)。
为解决问题1,您可以使用S3 Select。它允许您对S3中的对象(CSV和JSON文件)执行SQL查询。在CSV文件上执行S3 select查询,并为检索到的每个记录将其插入队列,然后让其他工作人员将其插入数据库。您也可以直接插入到您的RDS,但速度可能会更慢。
以下是代码示例:
const AWS = require('aws-sdk');
var fs = require('fs');

const S3 = new AWS.S3();

exports.handler = async (event, context) => {
    try {
        const query = "SELECT * FROM s3object s WHERE s.id > '0'";
        const bucket = 'my-bucket';
        const key = 'data.csv';

        const params = {
            Bucket: bucket,
            Key: key,
            ExpressionType: 'SQL',
            Expression: query,
            InputSerialization: { CSV: { FileHeaderInfo: 'USE' } },
            OutputSerialization: { CSV: {} }
        }

        const data = await getDataUsingS3Select(params);
        context.succeed(data);
    } catch (error) {
        context.fail(error);
    }
};

const getDataUsingS3Select = async (params) => {
    return new Promise((resolve, reject) => {
        S3.selectObjectContent(params, (err, data) => {
            if (err) { reject(err); }

            // This is a stream of events
            data.Payload.on('data', (event) => {
                // event, there is data inside it
                if (event.Records) {
                    // do what you want with payload: send to a queue or direct to db
                    console.log('Row:', event.Records.Payload.toString('utf8'));
                }
            }).on('end', () => {
                // we arrive here after processing everything
                resolve();
            });
        });
    })
}

如果您仍然超过了15分钟的时间限制,那么这就是问题2。首先,在SQL中添加一个limit子句。然后,您可以在Lambda的/tmp目录中创建一个“检查点”文件。您可以将上次处理的最后一条记录的id保存在该文件中,以便在重新运行Lambda函数时,它可以读取该文件,获取id并在查询的where子句中使用它,例如:
select * from s3object s where s.id > '99' limit 50000

15分钟的限制,你有可以使用的代码吗? - dang

1
如果您的主要目标是将S3中的CSV文件导入RDS MySQL中,则请查看AWS数据管道。它已经拥有了您在将S3数据加载到Amazon RDS MySQL表中完成此常见任务所需的所有定义资源,但它使用EC2实例。但与此同时,它更容易扩展和维护解决方案。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接