如何将Node.js流的内容读入字符串变量?

229

如何将来自Node.js流的所有数据收集到字符串中?


你应该复制流或使用(autoClose: false)标记。污染内存是不好的实践。 - 19h
21个回答

5

流(Streams)没有简单的.toString()函数(我理解这一点),也没有像.toStringAsync(cb)这样的函数(我不太理解)。所以我自己创建了一个辅助函数:

var streamToString = function(stream, callback) {
  var str = '';
  stream.on('data', function(chunk) {
    str += chunk;
  });
  stream.on('end', function() {
    callback(str);
  });
}

// how to use:
streamToString(myStream, function(myStr) {
  console.log(myStr);
});

5
使用Node.js内置的streamConsumers.text是最简单的方法:
import { text } from 'node:stream/consumers';
import { Readable } from 'node:stream';

const readable = Readable.from('Hello world from consumers!');
const string = await text(readable);

3

即使这个答案是10年前写的,我认为添加我的答案很重要,因为有一些流行的答案没有考虑到Node.js的官方文档(https://nodejs.org/api/stream.html#readablesetencodingencoding)中所说的:

可读流将正确处理通过流传递的多字节字符,否则如果仅作为缓冲区对象从流中拉出,则会变成不正确的解码。

这就是我要修改两个最受欢迎的答案并展示最佳编码过程的原因:

function streamToString(stream) {
    stream.setEncoding('utf-8'); // do this instead of directly converting the string
    const chunks = [];
    return new Promise((resolve, reject) => {
        stream.on('data', (chunk) => chunks.push(chunk));
        stream.on('error', (err) => reject(err));
        stream.on('end', () => resolve(chunks.join("")));
    })
}

const result = await streamToString(stream)

或者:

async function streamToString(stream) {
    stream.setEncoding('utf-8'); // do this instead of directly converting the string
    // input must be stream with readable property
    const chunks = [];

    for await (const chunk of stream) {
        chunks.push(chunk);
    }

    return chunks.join("");
}

2
最干净的解决方案可能是使用 "string-stream" 包,它将流转换为一个带有承诺的字符串。
const streamString = require('stream-string')

streamString(myStream).then(string_variable => {
    // myStream was converted to a string, and that string is stored in string_variable
    console.log(string_variable)

}).catch(err => {
     // myStream emitted an error event (err), so the promise from stream-string was rejected
    throw err
})

2
所有列出的答案似乎都在流动模式下打开可读流,这不是 NodeJS 的默认模式,可能存在限制,因为它缺乏 NodeJS 在暂停可读流模式下提供的反压支持。以下是使用 Just Buffers、本地流和本地流转换实现的支持对象模式的代码:
import {Transform} from 'stream';

let buffer =null;    

function objectifyStream() {
    return new Transform({
        objectMode: true,
        transform: function(chunk, encoding, next) {

            if (!buffer) {
                buffer = Buffer.from([...chunk]);
            } else {
                buffer = Buffer.from([...buffer, ...chunk]);
            }
            next(null, buffer);
        }
    });
}

process.stdin.pipe(objectifyStream()).process.stdout

2

那么像流缩减器这样的东西怎么样?

以下是使用ES6类的示例,说明如何使用流缩减器。

var stream = require('stream')

class StreamReducer extends stream.Writable {
  constructor(chunkReducer, initialvalue, cb) {
    super();
    this.reducer = chunkReducer;
    this.accumulator = initialvalue;
    this.cb = cb;
  }
  _write(chunk, enc, next) {
    this.accumulator = this.reducer(this.accumulator, chunk);
    next();
  }
  end() {
    this.cb(null, this.accumulator)
  }
}

// just a test stream
class EmitterStream extends stream.Readable {
  constructor(chunks) {
    super();
    this.chunks = chunks;
  }
  _read() {
    this.chunks.forEach(function (chunk) { 
        this.push(chunk);
    }.bind(this));
    this.push(null);
  }
}

// just transform the strings into buffer as we would get from fs stream or http request stream
(new EmitterStream(
  ["hello ", "world !"]
  .map(function(str) {
     return Buffer.from(str, 'utf8');
  })
)).pipe(new StreamReducer(
  function (acc, v) {
    acc.push(v);
    return acc;
  },
  [],
  function(err, chunks) {
    console.log(Buffer.concat(chunks).toString('utf8'));
  })
);

1

setEncoding('utf8');

Sebastian J 做得很好。

我有几行测试代码出现了“缓冲区问题”,添加编码信息后解决了,如下所示。

演示问题

软件

// process.stdin.setEncoding('utf8');
process.stdin.on('data', (data) => {
    console.log(typeof(data), data);
});
hello world

输出

object <Buffer 68 65 6c 6c 6f 20 77 6f 72 6c 64 0d 0a>

展示解决方案
软件
process.stdin.setEncoding('utf8'); // <- Activate!
process.stdin.on('data', (data) => {
    console.log(typeof(data), data);
});
hello world

输出

string hello world

0
使用您可能已经在项目依赖项中拥有的相当受欢迎的stream-buffers,这非常简单:
// imports
const { WritableStreamBuffer } = require('stream-buffers');
const { promisify } = require('util');
const { createReadStream } = require('fs');
const pipeline = promisify(require('stream').pipeline);

// sample stream
let stream = createReadStream('/etc/hosts');

// pipeline the stream into a buffer, and print the contents when done
let buf = new WritableStreamBuffer();
pipeline(stream, buf).then(() => console.log(buf.getContents().toString()));

0

这对我有用,基于Node v6.7.0文档:

let output = '';
stream.on('readable', function() {
    let read = stream.read();
    if (read !== null) {
        // New stream data is available
        output += read.toString();
    } else {
        // Stream is now finished when read is null.
        // You can callback here e.g.:
        callback(null, output);
    }
});

stream.on('error', function(err) {
  callback(err, null);
})

0
在我的情况下,内容类型响应头是Content-Type: text/plain。因此,我从缓冲区读取数据的方式如下:
let data = [];
stream.on('data', (chunk) => {
 console.log(Buffer.from(chunk).toString())
 data.push(Buffer.from(chunk).toString())
});

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接