如何使用异步生成器在Node可读流(Readable stream)中进行数据流处理?

4
我想要做类似这样的事情。
const { Readable } = require("stream");

function generatorToStream(generator) {
  return new Readable({
    read() {
      (async () => {
        for await (const result of generator()) {
          if (result.done) {
            this.push(null);
          } else {
            this.push(result.value);
          }
        }
      })();
    }
  });
}

generatorToStream(async function*() {
  const msg1 = await new Promise(resolve =>
    setTimeout(() => resolve("ola amigao"), 2000)
  );
  yield msg1;
  const msg2 = await new Promise(resolve =>
    setTimeout(() => resolve("ola amigao"), 2000)
  );
  yield msg2;

  const msg3 = await new Promise(resolve =>
    setTimeout(() => resolve("ola amigao"), 2000)
  );
  yield msg3;
}).pipe(process.stdout);

但是它没有起作用,结束事件从未被调用,我在终端上也没有接收到任何数据。

有什么解决方案或实现技巧吗?


你是否遇到了任何错误,特别是未处理的 Promise 拒绝?尝试在你的 IIAFE 后面添加 .catch(console.error) - Bergi
如果我没记错的话,当你使用for await of时,result不会是一个迭代记录,而是它本身的值。当生成器完成时,循环就会结束。 - Bergi
一个使用awaitasync函数,每个Promise都单独调用,就像一个生成器一样。它们能否结合在一起呢? - zer00ne
1个回答

2
我是Scramjet的作者,这是一个功能性的流处理框架,可能是您的一个简单解决方案。
如果您希望向您的项目中添加仅3个依赖项,则无需更简单:
const {StringStream} = require("scramjet");

StringStream.from(async function* () {
    yield await something();
    ...
});

如果您想自己实现这个功能,请查看DataStream line 112中的源代码 - 这应该很容易实现。一般来说,您需要实现类似于以下内容的东西:
function generatorToStream(gen) {
    // obtain an iterator
    const iter = await gen();
    // create your output
    const out = new Passthrough();

    // this IIFE will do all the work
    (async () => {
        let done = false;
        for await (let chunk of iter) {
            // if write returns true, continue, otherwise wait until out is drained.
            if (!out.write(chunk)) await new Promise((res, rej) => this.once("drain", res);
        }
    })()
        // handle errors by pushing them to the stream for example
        .catch(e => out.emit('error', e));

    // return the output stream
    return out;
}

以上示例或多或少就是 scramjet 中正在发生的事情——在保持较少事件处理程序等方面进行了一些优化,但在简单情况下,以上方法应该可以很好地工作。

赞扬你在任何事情之前声明你不是该库的作者,隐蔽的自我宣传非常令人沮丧。 - Robin Goupil

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接