如何将来自Node.js流的所有数据收集到字符串中?
如何将来自Node.js流的所有数据收集到字符串中?
流(Streams)没有简单的.toString()
函数(我理解这一点),也没有像.toStringAsync(cb)
这样的函数(我不太理解)。所以我自己创建了一个辅助函数:
var streamToString = function(stream, callback) {
var str = '';
stream.on('data', function(chunk) {
str += chunk;
});
stream.on('end', function() {
callback(str);
});
}
// how to use:
streamToString(myStream, function(myStr) {
console.log(myStr);
});
import { text } from 'node:stream/consumers';
import { Readable } from 'node:stream';
const readable = Readable.from('Hello world from consumers!');
const string = await text(readable);
即使这个答案是10年前写的,我认为添加我的答案很重要,因为有一些流行的答案没有考虑到Node.js的官方文档(https://nodejs.org/api/stream.html#readablesetencodingencoding)中所说的:
可读流将正确处理通过流传递的多字节字符,否则如果仅作为缓冲区对象从流中拉出,则会变成不正确的解码。
这就是我要修改两个最受欢迎的答案并展示最佳编码过程的原因:
function streamToString(stream) {
stream.setEncoding('utf-8'); // do this instead of directly converting the string
const chunks = [];
return new Promise((resolve, reject) => {
stream.on('data', (chunk) => chunks.push(chunk));
stream.on('error', (err) => reject(err));
stream.on('end', () => resolve(chunks.join("")));
})
}
const result = await streamToString(stream)
或者:
async function streamToString(stream) {
stream.setEncoding('utf-8'); // do this instead of directly converting the string
// input must be stream with readable property
const chunks = [];
for await (const chunk of stream) {
chunks.push(chunk);
}
return chunks.join("");
}
const streamString = require('stream-string')
streamString(myStream).then(string_variable => {
// myStream was converted to a string, and that string is stored in string_variable
console.log(string_variable)
}).catch(err => {
// myStream emitted an error event (err), so the promise from stream-string was rejected
throw err
})
import {Transform} from 'stream';
let buffer =null;
function objectifyStream() {
return new Transform({
objectMode: true,
transform: function(chunk, encoding, next) {
if (!buffer) {
buffer = Buffer.from([...chunk]);
} else {
buffer = Buffer.from([...buffer, ...chunk]);
}
next(null, buffer);
}
});
}
process.stdin.pipe(objectifyStream()).process.stdout
那么像流缩减器这样的东西怎么样?
以下是使用ES6类的示例,说明如何使用流缩减器。
var stream = require('stream')
class StreamReducer extends stream.Writable {
constructor(chunkReducer, initialvalue, cb) {
super();
this.reducer = chunkReducer;
this.accumulator = initialvalue;
this.cb = cb;
}
_write(chunk, enc, next) {
this.accumulator = this.reducer(this.accumulator, chunk);
next();
}
end() {
this.cb(null, this.accumulator)
}
}
// just a test stream
class EmitterStream extends stream.Readable {
constructor(chunks) {
super();
this.chunks = chunks;
}
_read() {
this.chunks.forEach(function (chunk) {
this.push(chunk);
}.bind(this));
this.push(null);
}
}
// just transform the strings into buffer as we would get from fs stream or http request stream
(new EmitterStream(
["hello ", "world !"]
.map(function(str) {
return Buffer.from(str, 'utf8');
})
)).pipe(new StreamReducer(
function (acc, v) {
acc.push(v);
return acc;
},
[],
function(err, chunks) {
console.log(Buffer.concat(chunks).toString('utf8'));
})
);
Sebastian J 做得很好。
我有几行测试代码出现了“缓冲区问题”,添加编码信息后解决了,如下所示。
软件
// process.stdin.setEncoding('utf8');
process.stdin.on('data', (data) => {
console.log(typeof(data), data);
});
hello world
输出
object <Buffer 68 65 6c 6c 6f 20 77 6f 72 6c 64 0d 0a>
process.stdin.setEncoding('utf8'); // <- Activate!
process.stdin.on('data', (data) => {
console.log(typeof(data), data);
});
hello world
输出
string hello world
stream-buffers
包,这非常简单:// imports
const { WritableStreamBuffer } = require('stream-buffers');
const { promisify } = require('util');
const { createReadStream } = require('fs');
const pipeline = promisify(require('stream').pipeline);
// sample stream
let stream = createReadStream('/etc/hosts');
// pipeline the stream into a buffer, and print the contents when done
let buf = new WritableStreamBuffer();
pipeline(stream, buf).then(() => console.log(buf.getContents().toString()));
这对我有用,基于Node v6.7.0文档:
let output = '';
stream.on('readable', function() {
let read = stream.read();
if (read !== null) {
// New stream data is available
output += read.toString();
} else {
// Stream is now finished when read is null.
// You can callback here e.g.:
callback(null, output);
}
});
stream.on('error', function(err) {
callback(err, null);
})
let data = [];
stream.on('data', (chunk) => {
console.log(Buffer.from(chunk).toString())
data.push(Buffer.from(chunk).toString())
});