如何在Node.js中创建双工流?

11

我可以这样创建一个新的双工流

const Duplex = require('stream').Duplex;
let myStream = new Duplex()
通过Websocket接收到数据块/缓冲区,每当通过Websocket接收到新的数据块时,我会像这样将其添加到流中: myStream.push(buffer) 然后我将流传输到另一个进程(在此示例中为ffmpeg) myStream.pipe(process.stdout); 这导致了错误NodeError: The _read() method is not implemented,我能理解这个错误,但我不理解为什么要实现它以及如何实现它。我还看到在Duplex类构造函数中可以传递读取函数,但这是为什么呢?我只想不断地向流中推送数据块,然后将其传输到另一个进程。
1个回答

12

Node.js的Duplex流要求实现者同时指定写和读方法:

import stream from 'stream';

const duplex = new stream.Duplex({
  write: (chunk, encoding, next) {
    // Do something with the chunk and then call next() to indicate 
    // that the chunk has been processed. The write() fn will handle
    // data piped into this duplex stream. After the write() has
    // finished, the data will be processed by the read() below.
    next();
  },
  read: ( size ) {
    // Add new data to be read by streams piped from this duplex
    this.push( "some data" )
  }
})

这里提供官方的nodejs流文档:流实现者的API

WebSocket场景
上述描述的WebSocket示例应该使用Readable流而不是双工流(duplex stream)。双工流在存储转发或处理转发场景中很有用。但是,似乎WebSocket示例中的流仅用于将数据从WebSocket移动到流接口。可以使用一个Readable流来实现这一目标:


import stream from 'stream';

const onSocketConnection = ( socket ) => {
    const readable = new stream.Readable({
      // The read logic is omitted since the data is pushed to the socket
      // outside of the script's control. However, the read() function 
      // must be defined.
      read(){}
    });

    socket.on('message', ( data ) => {
        // Push the data on the readable queue
        readable.push( data );
    });

    readable.pipe( ffmpeg );
}

我明白了,感谢您的解释!但我仍然想知道为什么你应该在read()中使用push。当你不断地向一个可读流写入(push)数据时,那感觉就像是一个可读+可写流。那和双工流(Duplex)有什么区别呢?我不确定为什么我所拥有的示例代码使用了Duplex流,但它只是将流传输到sox,然后再传输到ffmpeg,但如果我使用一个像你描述的可读流,那应该也能正常工作。 - curiousMind
进行“push”意味着将数据发送到管道中,而当其他组件在您的可读信号上调用“read”时,表示相应的数据汇已准备好接收更多数据。我认为,在这种情况下选择可读流的最简单原因是,您不希望引入双工会引入的两个缓冲区(写入和读取)-只需要读取缓冲区(这是推送数据的位置)。就我个人而言,只有在使用流API一段时间后,我才能确定何时使用双工/转换以及何时使用其他内容。 - jorgenkg
1
好的,我理解得没错吧,我的流是“可读的”,因为“某人”通过.pipe从中读取,但它不是可写的,因为我自己定义了如何填充读缓冲区(通过readable.push)?我的困惑在于命名,因为我最初认为“读取”函数读取进入流中的数据 - 这不是情况,因为当有人想要从中获取数据时,它是被调用的函数。但是以某种方式,我的可读流又有点“可写”,因为我可以从外部执行自定义的push,对吧?我向流中写入了一些东西 - 这让我感到困惑! - curiousMind
是的!说得好。我最初也误解了文档 :) - jorgenkg

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接