Node.js:如何将流读入缓冲区?

117

我写了一个相当简单的函数,可以从给定的URL下载图像,调整大小并上传到S3(使用'gm'和'knox'),但我不知道是否正确地将流读取到缓冲区中。(一切都正常工作,但这是正确的方式吗?)

另外,我想了解有关事件循环的一些内容,如何确定函数的一次调用不会泄漏任何内容或将“buf”变量更改为另一个已在运行的调用(或此情况不可能发生,因为回调是匿名函数?)

var http = require('http');
var https = require('https');
var s3 = require('./s3');
var gm = require('gm');

module.exports.processImageUrl = function(imageUrl, filename, callback) {
var client = http;
if (imageUrl.substr(0, 5) == 'https') { client = https; }

client.get(imageUrl, function(res) {
    if (res.statusCode != 200) {
        return callback(new Error('HTTP Response code ' + res.statusCode));
    }

    gm(res)
        .geometry(1024, 768, '>')
        .stream('jpg', function(err, stdout, stderr) {
            if (!err) {
                var buf = new Buffer(0);
                stdout.on('data', function(d) {
                    buf = Buffer.concat([buf, d]);
                });

                stdout.on('end', function() {
                    var headers = {
                        'Content-Length': buf.length
                        , 'Content-Type': 'Image/jpeg'
                        , 'x-amz-acl': 'public-read'
                    };

                    s3.putBuffer(buf, '/img/d/' + filename + '.jpg', headers, function(err, res) {
                        if(err) {
                            return callback(err);
                        } else {
                            return callback(null, res.client._httpMessage.url);
                        }
                    });
                });
            } else {
                callback(err);
            }
        });
    }).on('error', function(err) {
        callback(err);
    });
};

1
这些答案不能是真的,对吧?这个标准库函数在哪里? - jameshfisher
11个回答

110

总体而言,我没有看到任何可能会导致你的代码出现问题。

两个建议:

你合并Buffer对象的方式不够优化,因为它必须在每次“data”事件中复制所有已存在的数据。最好将这些块放入数组中,然后在结尾处将其全部连接起来。

var bufs = [];
stdout.on('data', function(d){ bufs.push(d); });
stdout.on('end', function(){
  var buf = Buffer.concat(bufs);
})

为了提高性能,建议检查您正在使用的 S3 库是否支持流式传输。理想情况下,您根本不需要创建一个大型缓冲区,而只需直接将 stdout 流传递给 S3 库即可。

至于您问题的第二部分,这是不可能实现的。当调用一个函数时,它会被分配一个独立的私有上下文,该上下文中定义的所有内容只能从该函数内部进行访问。

更新

将文件转储到文件系统可能意味着每个请求的内存使用量较少,但文件 IO 可能非常慢,因此可能不值得。我会说,在对此函数进行分析和压力测试之前,您不应过度优化。如果垃圾回收器正常工作,您可能过度优化了。

话虽如此,还有更好的方法,因此不要使用文件。由于您只需要长度,因此可以在不需要将所有缓冲区连接在一起的情况下计算长度,因此根本不需要分配新的缓冲区。

var pause_stream = require('pause-stream');

// Your other code.

var bufs = [];
stdout.on('data', function(d){ bufs.push(d); });
stdout.on('end', function(){
  var contentLength = bufs.reduce(function(sum, buf){
    return sum + buf.length;
  }, 0);

  // Create a stream that will emit your chunks when resumed.
  var stream = pause_stream();
  stream.pause();
  while (bufs.length) stream.write(bufs.shift());
  stream.end();

  var headers = {
      'Content-Length': contentLength,
      // ...
  };

  s3.putStream(stream, ....);

它支持流,但我需要知道S3头的Content-Length,而使用流是不可能的。 - Gal Ben-Haim
1
将“gm”流导入文件,然后从该文件打开流并使用文件大小作为Content-Length上传到S3,这是一种更好的做法吗?据我所知,这可以消除像我现在这样将整个文件加载到内存中的问题。 - Gal Ben-Haim
只想提醒一下,bufs.pop() 调用应该改为 bufs.unshift(),或者更简单的方法是用一个简单的 for 循环来替换整个 while 循环。 - Erhhung
在 on('data') 中,你可以直接使用 bytes += data.length 而不是在最后减少数组。 - Bergur
1
@Bergur 确实如此,但是这样你就必须维护两个单独的累加器变量。我更喜欢维护单个变量并稍后计算长度。我不确定这是否会对性能或其他方面产生明显的影响。 - loganfsmyth
显示剩余2条评论

71

Javascript片段

function stream2buffer(stream) {

    return new Promise((resolve, reject) => {
        
        const _buf = [];

        stream.on("data", (chunk) => _buf.push(chunk));
        stream.on("end", () => resolve(Buffer.concat(_buf)));
        stream.on("error", (err) => reject(err));

    });
} 

Typescript片段

async function stream2buffer(stream: Stream): Promise<Buffer> {

    return new Promise < Buffer > ((resolve, reject) => {
        
        const _buf = Array < any > ();

        stream.on("data", chunk => _buf.push(chunk));
        stream.on("end", () => resolve(Buffer.concat(_buf)));
        stream.on("error", err => reject(`error converting stream - ${err}`));

    });
} 

3
这个非常有效...这就是MVP(最有价值产品) - Jan
实际上,对我来说,stream2buffer() 的 JS 版本没有返回正确的值。 - Mike
1
嗨@MikeB,正如你所见,代码非常简单(也容易调试) 你能否提供更多有关“未返回适当值”的详细信息? - bsorrentino
@bsorrentino,我认为问题出在我返回值的方式上。在我的情况下,const pdfBuffered = \data:application/pdf;base64, ${Buffer.concat(chunks).toString("base64")}`;可以正常工作。因此,不仅仅是Buffer.concat(_buf)),而是Buffer.concat(chunks).toString("base64")`。 - Mike
难以置信这个不在标准库中。 - jameshfisher
显示剩余2条评论

29
注意:这仅回答了“如何将流读入缓冲区?”并忽略了原始问题的上下文。
ES2018回答
自Node 11.14.0起,可读流支持异步迭代器。
const buffers = [];

// node.js readable streams implement the async iterator protocol
for await (const data of readableStream) {
  buffers.push(data);
}

const finalBuffer = Buffer.concat(buffers);

奖励:未来,这可能会随着第三阶段的Array.fromAsync提案而变得更好。
//  DOES NOT WORK (yet!)
const finalBuffer = Buffer.concat(await Array.fromAsync(readableStream));

在其他答案中提出的事件驱动方案,是否比使用迭代器更可取?实际上,在查看其他建议之前,我已经采用了迭代器方法,然后发现这个 SO 问题中大多数答案都建议采用事件驱动方案。你认为呢? - Metro Smurf
是的,使用这种方法不会比事件更糟糕。大多数回答都是在此功能存在之前编写的。我认为这非常符合该语言的习惯用法,但仍有许多 JS 开发人员不知道它,因此最好的方法可能是您团队最了解的方法。 - Rico Kahler
1
函数在第一个块之后崩溃,没有响应,也没有错误...可能是什么问题?任何帮助将不胜感激。 - ISAE

10
如果您需要从http(s)的URI中拉取数据,您可以轻松地使用node-fetch完成此操作。
自自述文件中得知:
fetch('https://assets-cdn.github.com/images/modules/logos_page/Octocat.png')
    .then(res => res.buffer())
    .then(buffer => console.log)

3
你也可以滥用 node-fetch 中的 Response,从任何流中获取缓冲区而不仅仅是 http:new Response(stream).buffer() - haansn08
1
Response.buffer 不是一个函数。那么...怎么办? 编辑:Response.arrayBuffer 似乎可以工作。 - Frustrated programmer

6

您可以将可读流转换为缓冲区,并以异步方式集成到您的代码中,如下所示。

async streamToBuffer (stream) {
    return new Promise((resolve, reject) => {
      const data = [];

      stream.on('data', (chunk) => {
        data.push(chunk);
      });

      stream.on('end', () => {
        resolve(Buffer.concat(data))
      })

      stream.on('error', (err) => {
        reject(err)
      })
   
    })
  }

使用方法非常简单:

 // usage
  const myStream // your stream
  const buffer = await streamToBuffer(myStream) // this is a buffer

在Windows上无法使用process.stdin - 如果没有将任何输入导入到命令中,则enderror都不会触发。 - mindplay.dk

5
我建议使用loganfsmyths的方法,使用数组来存储数据。
var bufs = [];
stdout.on('data', function(d){ bufs.push(d); });
stdout.on('end', function(){
  var buf = Buffer.concat(bufs);
}

在我的当前工作示例中,我正在使用GRIDfs和npm的Jimp。

   var bucket = new GridFSBucket(getDBReference(), { bucketName: 'images' } );
    var dwnldStream = bucket.openDownloadStream(info[0]._id);// original size
  dwnldStream.on('data', function(chunk) {
       data.push(chunk);
    });
  dwnldStream.on('end', function() {
    var buff =Buffer.concat(data);
    console.log("buffer: ", buff);
       jimp.read(buff)
.then(image => {
         console.log("read the image!");
         IMAGE_SIZES.forEach( (size)=>{
         resize(image,size);
         });
});

我用了一个字符串方法,但没有成功,可能是因为我正在读取图像文件,但是数组方法确实可行。

const DISCLAIMER = "DONT DO THIS";
var data = "";
stdout.on('data', function(d){ 
           bufs+=d; 
         });
stdout.on('end', function(){
          var buf = Buffer.from(bufs);
          //// do work with the buffer here

          });

当我执行字符串方法时,npm jimp 报了以下错误。
buffer:  <Buffer 00 00 00 00 00>
{ Error: Could not find MIME for Buffer <null>

基本上,我认为从二进制到字符串的类型强制转换并不完全有效。

4

您可以通过以下方式实现:

async function toBuffer(stream: ReadableStream<Uint8Array>) {
  const list = []
  const reader = stream.getReader()
  while (true) {
    const { value, done } = await reader.read()
    if (value)
      list.push(value)
    if (done)
      break
  }
  return Buffer.concat(list)
}

或者使用缓冲消费者

const buf = buffer(stream)

你确定这个能用吗?我不认为它会读取整个流。"ReadableStreamDefaultReader接口的read()方法返回一个Promise,提供对流内部队列中下一块的访问。" https://dev59.com/FGYq5IYBdhLWcg3wtCvO - killdash9
@killdash9 我已更新我的答案以连接所有块。谢谢。 - shtse8

1
我只是想发布我的解决方案。以前的答案对我的研究非常有帮助。我使用长度流来获取流的大小,但问题在于回调在流接近末尾时触发,所以我还使用流缓存来缓存流并将其管道传输到res对象,一旦我知道内容长度。如果出现错误,
var StreamCache = require('stream-cache');
var lengthStream = require('length-stream');

var _streamFile = function(res , stream , cb){
    var cache = new StreamCache();

    var lstream = lengthStream(function(length) {
        res.header("Content-Length", length);
        cache.pipe(res);
    });

    stream.on('error', function(err){
        return cb(err);
    });

    stream.on('end', function(){
        return cb(null , true);
    });

    return stream.pipe(lstream).pipe(cache);
}

1

我建议使用缓冲区数组,并在最后仅将其连接到结果缓冲区。这很容易手动完成,或者可以使用node-buffers


1
在 TypeScript 中,[].push(bufferPart) 不兼容;因此:
getBufferFromStream(stream: Part | null): Promise<Buffer> {
    if (!stream) {
        throw 'FILE_STREAM_EMPTY';
    }
    return new Promise(
        (r, j) => {
            let buffer = Buffer.from([]);
            stream.on('data', buf => {
               buffer = Buffer.concat([buffer, buf]);
            });
            stream.on('end', () => r(buffer));
            stream.on('error', j);
        }
    );
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接