如何等待一个流传输完成?(Nodejs)

64

我有一个包含 Promise 的 for 循环数组,因此我使用 Promise.all 来遍历它们,并在之后调用 then 方法。

let promises = [];
promises.push(promise1);
promises.push(promise2);
promises.push(promise3);

Promise.all(promises).then((responses) => {
  for (let i = 0; i < promises.length; i++) {
    if (promise.property === something) {
      //do something
    } else {
      let file = fs.createWriteStream('./hello.pdf');
      let stream = responses[i].pipe(file);
      /*
         I WANT THE PIPING AND THE FOLLOWING CODE 
         TO RUN BEFORE NEXT ITERATION OF FOR LOOP
      */
      stream.on('finish', () => {
        //extract the text out of the pdf
        extract(filePath, {splitPages: false}, (err, text) => {
        if (err) {
          console.log(err);
        } else {
          arrayOfDocuments[i].text_contents = text;
        }
      });
    });    
  }
}

promise1,promise2和promise3是一些HTTP请求。如果其中一个请求的响应类型是application/pdf,则将其写入流中并解析其中的文本。但是,这段代码在解析PDF文本之前就开始了下一次迭代。有没有办法使代码在移动到下一次迭代之前等待流传输和文本提取完成?

5个回答

46

没有async/await,会变得非常麻烦。有了async/await,只需要这样做:

Promise.all(promises).then(async (responses) => {
  for (...) {
    await new Promise(resolve => stream.on("finish", resolve));
    //extract the text out of the PDF
  }
})

18
请注意:finish 事件只有在调用方正确处理流时才会触发。如果没有正确处理(例如 AWS SDK S3 上传),则可以使用 close 事件代替以避免 await 永远挂起。 - Malvineous
你能帮我理解return new Promise()await new Promise()之间的区别吗?在你的代码示例中,我相信前者不会产生暂停程序执行的期望效果,直到每个迭代的完成事件都已触发,而后者则会。为什么会这样呢?(在我的特定情况下,我正在循环中使用readable.pipe(writable),并发现除非我将管道包装在await new Promise()中,否则迭代不会暂停) - user1063287
我不确定你在问什么。但是在循环内调用 "await" 将暂停循环的执行,直到承诺解决,即直到 pipe 完成。因此,您将按顺序一个接一个地传输流。 - Sarsaparilla
不监听流的错误事件是不好的实践。 - sean
非常感谢...我有一些看似简单的代码,但由于流没有刷新而无法工作。经过数小时的尝试,我找到了解决方法并使其正常工作。干杯!z - J.Z.

18

类似以下内容也可以使用。我经常使用这个模式:

let promises = [];
promises.push(promise1);
promises.push(promise2);
promises.push(promise3);

function doNext(){
  if(!promises.length) return;
  promises.shift().then((resolved) =>{
    if(resolved.property === something){
      ...
      doNext();
    }else{
      let file = fs.createWriteStream('./hello.pdf');
      let stream = resolved.pipe(file);
      stream.on('finish', () =>{
        ...
        doNext();
      });
    }

  })
}
doNext();

或者将处理程序拆分为控制器和Promisified处理程序:

function streamOrNot(obj){
  return new Promise(resolve, reject){
    if(obj.property === something){
      resolve();
      return;
    }
    let file = fs.createWriteStream...;
    stream.on('finish', () =>{
      ...
      resolve();
    });
  }
}

function doNext(){
  if(!promises.length) return;
  return promises.shift().then(streamOrNot).then(doNext);
}

doNext()

在我看来,这是最好的答案。在这种情况下,使用 Promise 过于复杂,而且会限制(强制同步,而这并不在 OP 中)。 - Jamie Nicholl-Shelley
2
不监听流错误事件是不好的编程习惯。 - sean
1
确实。这是一个尚未完全开发成生产代码的技术示例。特别是实现者需要决定是在出现错误时拒绝还是处理它们并让所有其他流程完成。 - bknights

10

使用 stream.pipeline()await 替代 stream.pipe()

import * as StreamPromises from "stream/promises";

...
await StreamPromises.pipeline(sourceStream, destinationStream);

2
很遗憾,这只适用于Node 16+,那些仍需要14的人必须将pipeline方法promisify化。 - Justin Grote
发现子管道的问题。例如,假设我有Stream1、Stream2、StreamA和StreamB,其中StreamB.on('finish', () => console.log('Done 1'); StreamAB = StreamA.pipe(StreamB); 然后我执行await StreamPromises.pipeline([Stream1, Stream2, StreamAB]); console.log('Done 2'); 这将先打印出Done 2,然后再打印出Done 1,而不是等待子管道完成。 - Sam Araiza

4

您可以将else部分写在一个自调用函数内。这样,流的处理就可以并行进行。

(function(i) {
    let file = fs.createWriteStream('./hello.pdf');
    let stream = responses[i].pipe(file);
  /*
     I WANT THE PIPING AND THE FOLLOWING CODE 
     TO RUN BEFORE NEXT ITERATION OF FOR LOOP
  */
    stream.on('finish', () => {
      //extract the text out of the pdf
      extract(filePath, {splitPages: false}, (err, text) => {
      if (err) {
        console.log(err);
      } 
      else {
        arrayOfDocuments[i].text_contents = text;
      }
    });
  });    
})(i) 

或者你可以将流处理作为原始/个体承诺的一部分来处理。

目前,您正在创建承诺并将其添加到数组中,而不是将promise.then添加到数组中(也是一个promise)。在then的处理程序内部,执行您的流式处理操作。


这并没有在下一次迭代之前完成流。我仍然会发现 promise3 在 promise2 完成向流中写入之前被调用。 - ThePumpkinMaster
它不会等待,但当for循环中的下一次迭代发生时,它不会覆盖先前的流,一切都可以并行工作。 - Oxi
如果我想在 for 循环结束后使用该流,该怎么办?我该如何在这里使用 Promise 来实现呢? - ThePumpkinMaster
1
这就是闭包的美妙之处,它可以保留流和任何其他内部变量,即使在 for 循环完成后也是如此。对于 for 循环的每次迭代都会创建不同的流。这意味着,在完成 for 循环后,对于您创建的每个流,都将触发完成事件。 - Oxi

0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接