- 期望行为
- 实际行为
- 尝试过的方法
- 复现步骤
- 研究
期望行为
将从多个API请求中收到的多个可读流(pipe multiple readable streams)导入单个可写流(single writeable stream)。
API响应来自IBM-Watson的textToSpeech.synthesize()方法。
之所以需要多个请求,是因为该服务对文本输入有5KB的限制。
例如,18KB的字符串需要四个请求才能完成。
实际行为
可写流文件不完整且混乱。
应用程序似乎“挂起”了。
当我尝试在音频播放器中打开不完整的.mp3文件时,它会说它已损坏。
打开和关闭文件的过程似乎会增加其文件大小——好像打开文件会促使更多的数据流入其中。
对于较大的输入,例如四个小于4000字节的字符串,这种不良行为更为明显。
我尝试过的事情
我尝试了几种方法,使用npm包combined-stream, combined-stream2, multistream和archiver将可读流导入单个可写流或多个可写流,但它们都会导致文件不完整。我的最后一次尝试没有使用任何包,详见下面的重现步骤
部分。因此,我质疑应用程序逻辑的每个部分:
文本转语音文档中说,API响应类型为:01. Watson文本转语音API请求的响应类型是什么?
Response type: NodeJS.ReadableStream|FileObject|Buffer
我很困惑响应类型是三种可能的其中之一。
在尝试中,我一直假设它是一个可读流
。
02. 我可以在map函数中进行多个api请求吗?
03. 我可以在
promise()
中包装每个请求并解决response
吗?04. 我可以将生成的数组分配给
promises
变量吗?05. 我可以声明
var audio_files = await Promise.all(promises)
吗?06. 在此声明之后,所有响应是否都已“完成”?
07. 如何正确地将每个响应导入可写流?
08. 如何检测所有管道何时完成,以便我可以将文件发送回客户端?
对于问题2-6,我认为答案是“是”。
我认为我的失败与问题7和8有关。
重现步骤
您可以使用具有相应字节大小的四个随机生成的文本字符串的数组测试此代码,分别为3975
、3863
、3974
和3629
字节 - 这里有一个pastebin的数组。
// route handler
app.route("/api/:api_version/tts")
.get(api_tts_get);
// route handler middleware
const api_tts_get = async (req, res) => {
var query_parameters = req.query;
var file_name = query_parameters.file_name;
var text_string_array = text_string_array; // eg: https://pastebin.com/raw/JkK8ehwV
var absolute_path = path.join(__dirname, "/src/temp_audio/", file_name);
var relative_path = path.join("./src/temp_audio/", file_name); // path relative to server root
// for each string in an array, send it to the watson api
var promises = text_string_array.map(text_string => {
return new Promise((resolve, reject) => {
// credentials
var textToSpeech = new TextToSpeechV1({
iam_apikey: iam_apikey,
url: tts_service_url
});
// params
var synthesizeParams = {
text: text_string,
accept: 'audio/mp3',
voice: 'en-US_AllisonV3Voice'
};
// make request
textToSpeech.synthesize(synthesizeParams, (err, audio) => {
if (err) {
console.log("synthesize - an error occurred: ");
return reject(err);
}
resolve(audio);
});
});
});
try {
// wait for all responses
var audio_files = await Promise.all(promises);
var audio_files_length = audio_files.length;
var write_stream = fs.createWriteStream(`${relative_path}.mp3`);
audio_files.forEach((audio, index) => {
// if this is the last value in the array,
// pipe it to write_stream,
// when finished, the readable stream will emit 'end'
// then the .end() method will be called on write_stream
// which will trigger the 'finished' event on the write_stream
if (index == audio_files_length - 1) {
audio.pipe(write_stream);
}
// if not the last value in the array,
// pipe to write_stream and leave open
else {
audio.pipe(write_stream, { end: false });
}
});
write_stream.on('finish', function() {
// download the file (using absolute_path)
res.download(`${absolute_path}.mp3`, (err) => {
if (err) {
console.log(err);
}
// delete the file (using relative_path)
fs.unlink(`${relative_path}.mp3`, (err) => {
if (err) {
console.log(err);
}
});
});
});
} catch (err) {
console.log("there was an error getting tts");
console.log(err);
}
}
官方示例显示:
textToSpeech.synthesize(synthesizeParams)
.then(audio => {
audio.pipe(fs.createWriteStream('hello_world.mp3'));
})
.catch(err => {
console.log('error:', err);
});
这似乎对单个请求有效,但据我所知对于多个请求并不适用。
研究有关可读流和可写流、可读流模式(流动和暂停)、'data'、'end'、'drain'和'finish'事件、pipe()、fs.createReadStream()和fs.createWriteStream()的内容。
几乎所有的Node.js应用程序,无论多么简单,都会以某种方式使用流{{streams}}...
const server = http.createServer((req, res) => {
// `req` is an http.IncomingMessage, which is a Readable Stream
// `res` is an http.ServerResponse, which is a Writable Stream
let body = '';
// get the data as utf8 strings.
// if an encoding is not set, Buffer objects will be received.
req.setEncoding('utf8');
// readable streams emit 'data' events once a listener is added
req.on('data', (chunk) => {
body += chunk;
});
// the 'end' event indicates that the entire body has been received
req.on('end', () => {
try {
const data = JSON.parse(body);
// write back something interesting to the user:
res.write(typeof data);
res.end();
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end(`error: ${er.message}`);
}
});
});
https://nodejs.org/api/stream.html#stream_api_for_stream_consumers
可读流有两种主要模式,影响我们对其进行消费的方式...它们可以是
暂停
模式或流动
模式。所有可读流默认都以暂停模式启动,但可以在需要时轻松切换到流动
模式并返回暂停
模式...只需添加data
事件处理程序即可将暂停的流切换到流动
模式,删除data
事件处理程序则将流切换回暂停
模式。
https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93
以下是使用可读和可写流时可以使用的重要事件和功能列表。 在可读流中最重要的事件包括:
- `data` 事件,每当流向使用者传递一块数据时就会触发该事件。 - `end` 事件,在从流中没有更多数据可供使用时触发。
在可写流中最重要的事件包括:
- `drain` 事件,表示可写流可以接收更多数据。 - `finish` 事件,在所有数据已经刷新到底层系统时触发。
https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93
.pipe()
方法会监听来自fs.createReadStream()
的 'data' 和 'end' 事件。
https://github.com/substack/stream-handbook#why-you-should-use-streams
.pipe()
是一个函数,它接收一个可读的源流src,并将输出连接到目标可写流dst
。
https://github.com/substack/stream-handbook#pipe
pipe()
方法的返回值是目标流{{destination stream}}。
https://flaviocopes.com/nodejs-streams/#pipe
默认情况下,在源
Readable
流发出'end'
事件时,目标Writable
流上会调用stream.end(),以便目标不再可写。要禁用此默认行为,可以将选项end
传递为false
,从而使目标流保持打开状态:
https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options
'finish'
事件在调用stream.end()
方法并将所有数据刷新到底层系统后发出。
const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`);
}
writer.end('This is the end\n');
writer.on('finish', () => {
console.log('All writes are now complete.');
});
https://nodejs.org/api/stream.html#stream_event_finish
如果您想读取多个文件并将它们导入可写流中,您需要将每个文件都导入可写流,并在执行此操作时传递
end: false
,因为默认情况下,当没有更多数据可读取时,可读流会结束可写流。以下是一个示例:var ws = fs.createWriteStream('output.pdf');
fs.createReadStream('pdf-sample1.pdf').pipe(ws, { end: false });
fs.createReadStream('pdf-sample2.pdf').pipe(ws, { end: false });
fs.createReadStream('pdf-sample3.pdf').pipe(ws);
https://dev59.com/IIzda4cB1Zd3GeqPiByf#30916248
你想将第二个读取操作添加到第一个读取结束的事件监听器中...
var a = fs.createReadStream('a');
var b = fs.createReadStream('b');
var c = fs.createWriteStream('c');
a.pipe(c, {end:false});
a.on('end', function() {
b.pipe(c)
}
https://dev59.com/LHzaa4cB1Zd3GeqPQXvR#28033554
相关的谷歌搜索:
如何将多个可读流导入单个可写流?nodejs
涵盖相同或类似主题的问题,没有权威答案(或可能已过时):
NodeJS.ReadableStream|FileObject|Buffer
?那么我就可以更好地了解如何将它们连接并写入文件。谢谢。 - user1063287audio.pipe(fs.createWriteStream('hello_world.wav'));
。 - chughts