从两个已连接的流中创建一个Node.js流

20

如果可能的话,我想将两个Node.js流通过管道合并成一个。我正在使用Transform流。

换句话说,我希望我的库返回myStream供人们使用。例如,他们可以这样写:

process.stdin.pipe(myStream).pipe(process.stdout);

我在内部使用一个第三方vendorStream来完成一些工作,将其插入到包含在myInternalStream中的自己的逻辑中。因此,上面的内容应该翻译为:

process.stdin.pipe(vendorStream).pipe(myInternalStream).pipe(process.stdout);

我能像那样做吗?我已经尝试过 var myStream = vendorStream.pipe(myInternalStream),但显然不起作用。

bash 做个类比,假设我想编写一个程序来检查某个流的最后一行是否存在字母 htail -n 1 | grep h),我可以编写一个 shell 脚本:

# myscript.sh
tail -n 1 | grep h

然后如果人们这样做:

$ printf "abc\ndef\nghi" | . myscript.sh

它只是有效的。

这是我迄今为止所拥有的:

// Combine a pipe of two streams into one stream

var util = require('util')
  , Transform = require('stream').Transform;

var chunks1 = [];
var stream1 = new Transform();
var soFar = '';
stream1._transform = function(chunk, encoding, done) {
  chunks1.push(chunk.toString());
  var pieces = (soFar + chunk).split('\n');
  soFar = pieces.pop();
  for (var i = 0; i < pieces.length; i++) {
    var piece = pieces[i];
    this.push(piece);
  }
  return done();
};

var chunks2 = [];
var count = 0;
var stream2 = new Transform();
stream2._transform = function(chunk, encoding, done) {
  chunks2.push(chunk.toString());
  count = count + 1;
  this.push(count + ' ' + chunk.toString() + '\n');
  done();
};

var stdin = process.stdin;
var stdout = process.stdout;

process.on('exit', function () {
    console.error('chunks1: ' + JSON.stringify(chunks1));
    console.error('chunks2: ' + JSON.stringify(chunks2));
});
process.stdout.on('error', process.exit);


// stdin.pipe(stream1).pipe(stream2).pipe(stdout);

// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js
// Outputs:
// 1 abc
// 2 def
// 3 ghi
// chunks1: ["abc\nd","ef\nghi\n"]
// chunks2: ["abc","def","ghi"]

// Best working solution I could find
var stream3 = function(src) {
  return src.pipe(stream1).pipe(stream2);
};
stream3(stdin).pipe(stdout);

// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js
// Outputs:
// 1 abc
// 2 def
// 3 ghi
// chunks1: ["abc\nd","ef\nghi\n"]
// chunks2: ["abc","def","ghi"]

这是否有可能呢?如果我的意图不明确,请告诉我。

谢谢!

2个回答

30
您可以监视流中是否有内容,然后将其 unpipe 并重新传输到您感兴趣的流中:

你可以观察一个要被传送到你的流中的东西,然后使用unpipe将其取消传送,并传输到你感兴趣的流中:

注:为了更好地符合中文语法和习惯,我稍微修改了原句的表达方式。
var PassThrough = require('stream').PassThrough;

var stream3 = new PassThrough();

// When a source stream is piped to us, undo that pipe, and save
// off the source stream piped into our internally managed streams.
stream3.on('pipe', function(source) {
  source.unpipe(this);
  this.transformStream = source.pipe(stream1).pipe(stream2);
});

// When we're piped to another stream, instead pipe our internal
// transform stream to that destination.
stream3.pipe = function(destination, options) {
  return this.transformStream.pipe(destination, options);
};

stdin.pipe(stream3).pipe(stdout);

您可以将此功能提取到自己的可构建流类中:

var util = require('util');
var PassThrough = require('stream').PassThrough;

var StreamCombiner = function() {
  this.streams = Array.prototype.slice.apply(arguments);

  this.on('pipe', function(source) {
    source.unpipe(this);
    for(i in this.streams) {
      source = source.pipe(this.streams[i]);
    }
    this.transformStream = source;
  });
};

util.inherits(StreamCombiner, PassThrough);

StreamCombiner.prototype.pipe = function(dest, options) {
  return this.transformStream.pipe(dest, options);
};

var stream3 = new StreamCombiner(stream1, stream2);
stdin.pipe(stream3).pipe(stdout);

非常感谢@brandon,这太棒了!我更新了我的Gist https://gist.github.com/nicolashery/5910969 - Nicolas Hery
太棒了。我也在考虑做类似的事情,但我就是缺乏信心,担心会有一些微妙的细节被忽略,从而导致我的解决方案是错误的。谢谢你给我带来的信心。 - FellowMD
顺便提一下,为了使这个解决方案起作用,您需要将stream3管道连接到源(在本例中是stdin),然后再将其管道连接到stdout。因此,不要使用stream3.pipe(stdout); stream3.write(data); 但这真的很有帮助!谢谢! - Robert Balicki
原来,stream3是一个转换流,所以它没有write方法。无论如何。 - Robert Balicki
当您将多个流管道传输到同一个“StreamCombiner”对象中时,这不会出现问题吗?我认为应该在构造函数中创建“this.transformStream”,而不是在每个“pipe”事件上被覆盖。 - geon
2
有没有提供这种功能的库? - Simon B.Robert

4

也许有一种选择是使用multipipe,它允许您将多个转换链接在一起,作为单个转换流进行包装:

// my-stream.js
var multipipe = require('multipipe');

module.exports = function createMyStream() {
  return multipipe(vendorStream, myInternalStream);
};

然后您可以这样做:
var createMyStream = require('./my-stream');

var myStream = createMyStream();

process.stdin.pipe(myStream).pipe(process.stdout);

澄清:这使得标准输入经过vendorStream,然后通过myInternalStream最终输出到标准输出。

你显然也可以使用 https://www.npmjs.com/package/lazypipe,它具有稍微不同的API。 - Jonas Berlin

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接