如何在Node.Js中从字符串创建流?

267

我正在使用一个库,ya-csv,它需要文件或流作为输入,但是我有一个字符串。

在Node中,如何将该字符串转换为流?

12个回答

218

正如@substack#node中纠正我的一样,Node v10 中的新 流 API使得这更容易:

const Readable = require('stream').Readable;
const s = new Readable();
s._read = () => {}; // redundant? see update below
s.push('your text here');
s.push(null);

之后,您可以自由地传输它或以其他方式将其传递给您计划的消费者。

这不如resumer一句话的处理干净,但它确实避免了额外的依赖。

更新:目前在v0.10.26到v9.2.1中,如果您没有设置_read,直接从REPL提示符调用push将导致not implemented异常。它不会在函数或脚本内崩溃。如果不一致让您紧张,包括noop。)


6
所有可读流的实现都必须提供一个 _read 方法来从底层资源获取数据。【注:此为 Node.js 官方文档中某部分内容的翻译】 - Felix Rabe
2
@eye_mew 你需要先 require('stream')。 - Jim Jones
11
为什么要将null推入流的缓冲区? - dopatraman
7
null 表示流已经读取完所有数据并关闭了流。 - chrishiestand
3
看起来你不应该用这种方式做。引用文档:“readable.push()方法仅适用于Readable的实现者,并且只能从readable._read()方法内部调用。” - Axel Rauschmayer
显示剩余6条评论

164

不要使用Jo Liss的回答。在大多数情况下,它可能有效,但在我的情况下,它让我花了整整4到5个小时来查找错误。没有必要使用第三方模块来完成这个任务。

新的回答

var Readable = require('stream').Readable

var s = new Readable()
s.push('beep')    // the string you want
s.push(null)      // indicates end-of-file basically - the end of the stream

这应该是一个完全符合要求的可读流。点击这里了解如何正确使用流的更多信息。 旧答案: 只需使用原生的PassThrough流即可。
var stream = require("stream")
var a = new stream.PassThrough()
a.write("your string")
a.end()

a.pipe(process.stdout) // piping will work as normal
/*stream.on('data', function(x) {
   // using the 'data' event works too
   console.log('data '+x)
})*/
/*setTimeout(function() {
   // you can even pipe after the scheduler has had time to do other things
   a.pipe(process.stdout) 
},100)*/

a.on('end', function() {
    console.log('ended') // the end event will be called properly
})

请注意,'close'事件不会被触发(这并不是流接口所要求的)。

2
@Finn 如果 JavaScript 中没有参数,你不需要使用括号。 - B T
2
在2018年不要使用"var",而是使用"const"。 - stackdave

146

从node 10.17版本开始,stream.Readable有一个from方法,可以轻松地从任何可迭代对象(包括数组字面量)创建流:

const { Readable } = require("stream")

const readable = Readable.from(["input string"])

readable.on("data", (chunk) => {
  console.log(chunk) // will be called once with `"input string"`
})

请注意,在至少10.17和12.3之间,字符串本身是可迭代的,因此Readable.from("input string")将有效,但会每个字符发出一个事件。Readable.from(["input string"])将为数组中的每个项(在此情况下为一个项)发出一个事件。
还要注意,在较新的节点(可能是12.3,因为文档说函数在那时被更改)中,不再需要将字符串包装在数组中。

https://nodejs.org/api/stream.html#stream_stream_readable_from_iterable_options


2
根据 stream.Readable.from 的说明,调用 Readable.from(string) 或 Readable.from(buffer) 不会迭代字符串或缓冲区以匹配其他流的语义,这是出于性能方面的考虑。 - abbr
1
我的错。该函数在10.7中添加,并且按照我最初描述的方式运行。自那时以来,字符串不再需要包装在数组中(自12.3开始,它不再逐个迭代每个字符)。 - Fizker

35

只需创建一个 stream 模块的新实例并根据您的需要进行自定义:

var Stream = require('stream');
var stream = new Stream();

stream.pipe = function(dest) {
  dest.write('your string');
  return dest;
};

stream.pipe(process.stdout); // in this case the terminal, change to ya-csv
或者
var Stream = require('stream');
var stream = new Stream();

stream.on('data', function(data) {
  process.stdout.write(data); // change process.stdout to ya-csv
});

stream.emit('data', 'this is my string');

13
这段代码违反了流的惯例。pipe()应该至少返回目标流。 - greim
2
如果使用此代码,则不会调用结束事件。这不是创建可通常使用的流的好方法。 - B T

12

编辑: Garth的回答 可能更好。

我的旧回答内容保留如下。


要将字符串转换为流,您可以使用一个已暂停的through 流:

through().pause().queue('your string').end()

例子:

var through = require('through')

// Create a paused stream and buffer some data into it:
var stream = through().pause().queue('your string').end()

// Pass stream around:
callback(null, stream)

// Now that a consumer has attached, remember to resume the stream:
stream.resume()

我无法将zeMirco的解决方案用于我的用例,但是resumer效果非常好。谢谢! - mpen
@substack 的恢复建议对我非常有效。谢谢! - Garth Kidd
2
Resumer很棒,但是“在nextTick上自动恢复流”可能会带来一些意外,如果你期望将流传递给未知的消费者!我有一些代码,如果元数据的数据库保存成功,则将内容流传输到文件。这是一个潜在的错误,当db写立即返回成功时,它偶然成功了!后来我重构了一些东西,放在了一个异步块中,结果,流就再也无法读取了。教训:如果你不知道谁会消费你的流,请坚持使用through().pause().queue('string').end()技术。 - Jolly Roger
2
我花了大约5个小时来调试我的代码,因为我使用了这个答案的resumer部分。如果你能把它删除掉就太好了。 - B T

11

1
这是在“有一个应用程序可以解决这个问题”的基础上开的玩笑吗? ;) - masterxilo
1
评论中的链接非常有用:https://www.npmjs.com/package/string-to-stream - Dem Pilafian
FYI,我尝试使用这个库将JSON写入Google Drive,但它对我无效。在这里写了一篇文章:https://medium.com/@dupski/nodejs-creating-a-readable-stream-from-a-string-e0568597387f。同时也作为下面的答案添加了进来。 - Russell Briggs

9
另一个解决方案是将读取函数传递给Readable的构造函数(参见文档 流可读选项)。
var s = new Readable({read(size) {
    this.push("your string here")
    this.push(null)
  }});

你可以使用 s.pipe 作为示例。

在最后使用return的目的是什么? - Kirill Reznikov
始终返回某些东西(或什么都不返回),这是来自文档的示例。 - Philippe T.
在JS中,如果一个函数没有返回值,它相当于一个空的返回值。请问您能提供一下您找到这个信息的链接吗? - Kirill Reznikov
你应该写上。我说这话更多是为了最佳实践。我想要返回空值,这不是一个错误。所以我删除了这行代码。 - Philippe T.

6

在 CoffeeScript 中:

class StringStream extends Readable
  constructor: (@str) ->
    super()

  _read: (size) ->
    @push @str
    @push null

使用它:

new StringStream('text here').pipe(stream1).pipe(stream2)

5
我厌倦了每隔六个月都要重新学习这个,所以我发布了一个npm模块来抽象掉实现细节: https://www.npmjs.com/package/streamify-string 这是该模块的核心:
const Readable = require('stream').Readable;
const util     = require('util');

function Streamify(str, options) {

  if (! (this instanceof Streamify)) {
    return new Streamify(str, options);
  }

  Readable.call(this, options);
  this.str = str;
}

util.inherits(Streamify, Readable);

Streamify.prototype._read = function (size) {

  var chunk = this.str.slice(0, size);

  if (chunk) {
    this.str = this.str.slice(size);
    this.push(chunk);
  }

  else {
    this.push(null);
  }

};

module.exports = Streamify;

str是在调用构造函数时必须传递的string,并将作为数据由流输出。 options是可以传递给流的典型选项,根据文档

根据Travis CI的说法,它应该与大多数Node版本兼容。


2
当我最初发布这篇文章时,我没有包含相关的代码,被告知这是不受欢迎的。 - Chris Allen Lane

5
这是TypeScript中一个整洁的解决方案:
import { Readable } from 'stream'

class ReadableString extends Readable {
    private sent = false

    constructor(
        private str: string
    ) {
        super();
    }

    _read() {
        if (!this.sent) {
            this.push(Buffer.from(this.str));
            this.sent = true
        }
        else {
            this.push(null)
        }
    }
}

const stringStream = new ReadableString('string to be streamed...')

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接