在Node中将tar文件解压缩为流

4

我正在尝试编写一个AWS Lambda,它将从一个S3存储桶中获取tar.gz文件,解压缩并解包文件,同时将文件流传送到另一个S3存储桶。

以下是我的代码:

var AWS = require('aws-sdk');
var fs = require('fs');
var zlib = require('zlib');
var uuid = require('uuid/v4');
var tar = require('tar-stream')
var pack = tar.pack()
var s3 = new AWS.S3();

exports.handler = (event, context, callback) => {
  var bucket = event.Records[0].s3.bucket.name;
  var key = event.Records[0].s3.object.key;

  var file = 'S3://' + bucket + '/' + key;

  console.log(bucket)
  console.log(key)

  var readParams = {
    Bucket: bucket,
    Key: key
  };

  var dataStream = s3.getObject(readParams).createReadStream();

  var extract = tar.extract()

  extract.on('entry', function(header, stream, next) {
    console.log(header.name)
    var writeParams = {
      Bucket: process.env.JOB_PROCESSING_BUCKET,
      Key: uuid() + '-' + header.name,
      Body: stream
    };

    s3.upload(writeParams).
    on('httpUploadProgress', function(evt) {
      console.log('Progress:', evt.loaded, '/', evt.total);
    }).
    send(function(err, data) {
      if (err) console.log("An error occurred", err);
      console.log("Uploaded the file at", data.Location);
    });
    stream.on('end', function() {
      next() // ready for next entry
    })
    stream.resume() // just auto drain the stream
  })

  extract.on('finish', function() {
    // all entries read
  })

  dataStream.pipe(zlib.createGunzip()).pipe(extract);

  callback(null, 'Gunzip Lambda Function');
};

它会拉取文件,解压缩gzipping,然后我可以看到每个文件在进入时被提取。代码然后尝试将文件流传输到S3,这会创建一个0kb的文件,像是在读取流一样挂起,然后继续到下一个。

为什么它似乎无法读取/处理流主体? 有没有更好的方法来做到这一点?

谢谢

2个回答

3
我不确定这是否是最佳解决方案,但以下代码适用于我。
const AWS = require('aws-sdk');
const s3 = new AWS.S3();
const tar = require('tar-stream');
const zlib = require('zlib');
const stream = require('stream');
const uuid = require('uuid');

exports.get = (event, context) => {

  var params = {
      Bucket: event.Records[0].s3.bucket.name,
      Key: event.Records[0].s3.object.key
  };

  var dataStream = s3.getObject(params).createReadStream();

  var extract = tar.extract();

  extract.on('entry', function(header, inputStream, next) {

      inputStream.pipe(uploadFromStream(s3,header));

      inputStream.on('end', function() {
          next(); // ready for next entry
      });

      inputStream.resume(); // just auto drain the stream
  });

  extract.on('finish', function() {
      // all entries read
  });

  dataStream.pipe(zlib.createGunzip()).pipe(extract);

}

function uploadFromStream(s3,header) {
    var pass = new stream.PassThrough();

    var writeParams = {
        Bucket: process.env.JOB_PROCESSING_BUCKET,
        Key: uuid.v1() + '-' + header.name,
        Body: pass
    };

    s3.upload(writeParams, function(err, data) {
        context.done(err, data);
    });

    return pass;
}

1

尝试了几个小时才让它正常工作,结果发现“finish”事件已被替换为“end”。因此-上面的答案非常有效,只需要做出小改变-

inputStream.on('end', function() {
  next(); // ready for next entry
});

- Should be - 

inputStream.on('finish', function() {
  next(); // ready for next entry
});

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接