实现一个缓冲转换流

21
我正在尝试使用新的Node.js流API实现一个流,它将缓冲一定量的数据。当这个流被管道传输到另一个流中,或者如果某些东西消耗了readable事件,则此流应该刷新其缓冲区,然后简单地变为通行证。问题在于,这个流将被传输到许多其他流中,当每个目标流被连接时,必须刷新缓冲区,即使它已经刷新到另一个流中。即使它已经刷新到另一个流中

例如:

  1. BufferStream 实现了 stream.Transform,并保持一个 512KB 的内部环形缓冲区。
  2. ReadableStreamA 被连接到一个 BufferStream 实例上。
  3. BufferStreamReadableStreamA 中读取数据,并将其写入环形缓冲区中(不会有数据丢失,因为缓冲区会覆盖旧数据)。
  4. BufferStream 被连接到 WritableStreamB 上。
  5. WritableStreamB 接收整个 512KB 缓冲区,并在 BufferStream 中有数据写入时继续接收数据。
  6. BufferStream 被连接到 WritableStreamC 上。
  7. WritableStreamC 同样接收整个 512KB 缓冲区,但此时缓冲区已经与 WritableStreamB 所接收到的不同,因为缓冲区中有更多的数据被写入了。
这是否可以使用流API实现?我所能想到的唯一方法是创建一个对象,该对象具有一种方法,为每个目标启动新的PassThrough流,这意味着我不能简单地将其导入和导出。值得一提的是,我已经用旧的“流动”API完成了这个过程,只需在数据事件上监听新的处理程序。当使用.on('data')附加新函数时,我会直接使用环形缓冲区的副本来调用它。

一个疑问:数据是只以512KB的突发方式传输,还是只有第一个突发是512KB? - user568109
@user568109 当某个东西开始从缓冲流接收数据时,它应该接收最初的512KB缓冲区(仅一次),然后将继续在数据可用时通过缓冲流接收数据。只有第一个块是512KB(或缓冲区大小)。 - Brad
2个回答

8
这是我对你的问题的看法。
基本思路是创建一个Transform流,这将允许我们在将数据发送到流的输出之前执行自定义缓冲逻辑:
var util = require('util')
var stream = require('stream')

var BufferStream = function (streamOptions) {
  stream.Transform.call(this, streamOptions)
  this.buffer = new Buffer('')
}

util.inherits(BufferStream, stream.Transform)

BufferStream.prototype._transform = function (chunk, encoding, done) {
  // custom buffering logic
  // ie. add chunk to this.buffer, check buffer size, etc.
  this.buffer = new Buffer(chunk)

  this.push(chunk)
  done()
}

接下来,我们需要重写.pipe()方法,以便在BufferStream被管道传输到流中时得到通知,从而使我们能够自动向其写入数据:

BufferStream.prototype.pipe = function (destination, options) {
  var res = BufferStream.super_.prototype.pipe.call(this, destination, options)
  res.write(this.buffer)
  return res
}

在这种情况下,当我们编写buffer.pipe(someStream)时,我们按照预期执行管道并将内部缓冲区写入输出流。之后,Transform类会处理所有内容,并跟踪背压等等。
这里是一个有效的代码片段。请注意,我没有费心编写正确的缓冲逻辑(即我不关心内部缓冲区的大小),但这应该很容易修复。

我认为这很接近但不是100%正确。在第一次调用管道之前,转换实现需要将所有内容放入缓冲区,然后一旦调用管道,就切换到调用this.push。 - Alexander Mills
翻译:关于编程的内容,请问“_flush()”方法怎么样? - Alexander Mills

1
保罗的回答很好,但我认为它不符合精确要求。听起来需要发生的是每次在这个转换流上调用pipe()时,它都需要首先清空代表从创建转换流/(连接到源流)到连接到当前可写/目标流之间所有数据累积的缓冲区。可能更正确的做法是这样的:
  var BufferStream = function () {
        stream.Transform.apply(this, arguments);
        this.buffer = []; //I guess an array will do
    };

    util.inherits(BufferStream, stream.Transform);

    BufferStream.prototype._transform = function (chunk, encoding, done) {

        this.push(chunk ? String(chunk) : null);
        this.buffer.push(chunk ? String(chunk) : null);

        done()
    };

    BufferStream.prototype.pipe = function (destination, options) {
        var res = BufferStream.super_.prototype.pipe.apply(this, arguments);
        this.buffer.forEach(function (b) {
            res.write(String(b));
        });
        return res;
    };


    return new BufferStream();

我猜这个意思是:

我认为这个:

BufferStream.super_.prototype.pipe.apply(this, arguments);

这等同于以下内容:

...

stream.Transform.prototype.pipe.apply(this, arguments);

你可能可以优化这个,当调用pipe/unpipe时使用一些标志。

当然,这种方法的问题在于它正在缓冲所有数据,并且它永远不会停止缓冲,因此,除非你小心谨慎,否则这很容易导致内存“泄漏”。对于短暂的程序可能没问题,但对于服务器等则不适用。 - Alexander Mills

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接