在Node.js中,从可写流中暂停已管道化的可读流的正确方法是什么?

18

我正在编写一个模块,它是一个可写流。我想为我的用户实现管道接口。

如果发生错误,我需要暂停可读流并发出错误事件。然后,用户将决定 - 如果他可以处理这个错误,他应该能够恢复数据处理。

var writeable = new BackPressureStream();
writeable.on('error', function(error){
    console.log(error);
    writeable.resume();
});

var readable = require('fs').createReadStream('somefile.txt');
readable.pipe.(writeable);

我看到Node提供了readable.pause()方法,可以用于暂停可读流。但我不知道如何从我自己的可写流模块中调用它:

我想请问如何在我的可写流模块中使用readable.pause()方法?

var Writable = require('stream').Writable;

function BackPressureStream(options) {
    Writable.call(this, options);
}
require('util').inherits(BackPressureStream, Writable);

BackPressureStream.prototype._write = function(chunk, encoding, done) {
    done();
};

BackPressureStream.prototype.resume = function() {
    this.emit('drain');
}
如何在可写流中实现反压?
P.S. 可以使用pipe/unpipe事件,该事件提供可读流作为参数。 但是也有人说,对于已连接的流,暂停的唯一机会是从可写流中取消连接的可读流。
我理解正确吗? 直到用户调用resume之前,我必须取消连接我的可写流吗? 用户调用resume后,我应该将可读流重新连接吗?

1
你有兴趣为这个问题设置赏金吗? - Alexander Mills
1
嘿,你找到了你的问题的答案吗? - Max Koretskyi
3个回答

2
你所描述的功能已经被pipe方法本身实现了。在文档的写入时出错章节中有这样一句话:

如果一个可读流在Writable发生错误时被管道化,那么这个可读流将会被取消管道化。

因此,作为一个可写流的实现者,你唯一需要做的就是实现_write方法并在出现错误时发出错误信号。取消管道化将由Stream模块自动处理。然后,消费者模块的工作就是在他们认为错误不是关键问题时将可读流重新连接到管道中。以下是他们可能会这样做的方式:
var writeable = new BackPressureStream();
var readable = require('fs').createReadStream('somefile.txt');

writeable.on('error', function(error) {
    // use pipe again, if error is not critical
    if (!error.critical) {
        readable.pipe(writeable);
    } else {
        readable.destroy(error);
    }
});

readable.pipe(writeable);

在你的模块内部:

BackPressureStream.prototype._write = function(chunk, encoding, done) {
    // call done with an error to emit 'error' event and unpipe readable stream
    done(new Error('BOOM'));
};

1

现在原生的 NodeJS 流支持反压和缓冲,因此不需要访问或与源流交互。而 pipe() 可以同时处理这两个问题。

你只需要正确实现 _write() 即可。

function _write(chunk, enc, callback) {
    // if you don't invoke callback, data is buffered, and writes paused when buffer is full
}

引用文档:

在调用writable._write()和回调函数之间发生的所有对writable.write()的调用都会导致写入的数据被缓冲。

在转发错误后,直到用户确认继续,不要为下一块调用callback()。这将导致来自源的数据被缓冲。

当重复调用writable.write(chunk)方法时,数据会在可写流中缓冲。只要内部写缓冲区的总大小低于highWaterMark设置的阈值,对writable.write()的调用将返回true。一旦内部缓冲区的大小达到或超过highWaterMark,false将被返回。

在可写流的缓冲区已满后,对write()的调用将返回false。如果源流实现良好或是本机节点流,则它将自动停止write()更多数据。


0

基本上,我理解你的意思是,在出现错误事件的情况下对流进行回压。你有几个选择。

首先,正如你已经发现的那样,使用pipe来获取读取流的实例并进行一些花式操作。

另一个选项是创建一个包装可写流,提供这个功能(即它以WritableStream作为输入,并在实现流函数时将数据传递给提供的流)。

基本上,你最终会得到像这样的东西

源流 -> 包装可写流 -> 可写流

https://nodejs.org/api/stream.html#stream_implementing_a_writable_stream 处理实现可写流的问题。

关键是,如果底层可写流发生错误,你需要在流上设置一个标志,下一次调用write时,你需要缓冲块、存储回调并仅调用。类似于

// ...
constructor(wrappedWritableStream) {
    wrappedWritableStream.on('error', this.errorHandler);
    this.wrappedWritableStream = wrappedWritableStream;
}
// ...
write(chunk, encoding, callback) {
    if (this.hadError) {
        // Note: until callback is called, this function won't be called again, so we will have maximum one stored
        //  chunk.
        this.bufferedChunk = [chunk, encoding, callback];
    } else {
        wrappedWritableStream.write(chunk, encoding, callback);
    }
}
// ...
errorHandler(err) {
    console.error(err);
    this.hadError = err;
    this.emit(err);
}
// ...
recoverFromError() {
    if (this.bufferedChunk) {
        wrappedWritableStream.write(...this.bufferedChunk);
        this.bufferedChunk = undefined;
    }
    this.hadError = false;
}

注意:你只需要实现write函数,但我鼓励你去挖掘和尝试其他的实现函数。
另外值得注意的是,你可能会遇到一些问题,在流发出错误事件时写入数据,但我将把它作为一个单独的问题留给你去解决。
这里有另一个关于后压资源的好资料https://nodejs.org/en/docs/guides/backpressuring-in-streams/

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接