这是一个复杂的任务,因此我将其分解为几个部分:
步骤1:将每个异步可迭代对象的值记录到控制台
在考虑创建异步迭代器之前,我们应该首先考虑简单地将每个迭代器的值记录到控制台。与javascript中的大多数并发任务一样,这涉及调用多个异步函数并使用Promise.all
等待它们的结果。
function merge(iterables) {
return Promise.all(
Array.from(iterables).map(async (iter) => {
for await (const value of iter) {
console.log(value);
}
}),
);
}
merge([a, b, c]);
CodeSandbox链接: https://codesandbox.io/s/tender-ives-4hijy?fontsize=14
merge
函数会从每个迭代器中记录值,但它几乎没有用处; 当所有迭代器都完成时,它返回一个解决为undefined
数组的承诺。
第二步: 用合并异步生成器替换合并函数
下一步是用调用父异步迭代器的推送函数调用替换console.log
。为此使用异步生成器需要更多代码,因为唯一将值“推送”到异步生成器的方法是使用yield
运算符,而该运算符无法在子函数范围内使用。解决方案是创建两个队列,即推送队列和拉取队列。接下来,我们定义一个push
函数,如果没有未处理的拉取,则将其推入推送队列,否则将一个值排队等待稍后拉取。最后,我们必须持续地产生来自推送队列的值或者是由push稍后调用的resolve函数所排队的承诺。以下是代码:
async function *merge(iterables) {
const pushQueue = [];
const pullQueue = [];
function push(value) {
if (pullQueue.length) {
pullQueue.pop()(value);
} else {
pushQueue.unshift(value);
}
}
const finishP = Promise.all(
Array.from(iterables).map(async (iter) => {
for await (const value of iter) {
push(value);
}
}),
);
while (true) {
if (pushQueue.length) {
yield pushQueue.pop();
} else {
yield new Promise((resolve) => {
pullQueue.unshift(resolve);
});
}
}
}
(async () => {
const limit = 9;
let i = 0;
const xs = [];
for await (const x of merge([a, b, c])) {
xs.push(x);
console.log(x);
i++;
if (i === limit) {
break;
}
}
console.log(xs);
})().catch(error => console.error(error));
CodeSandbox链接:https://codesandbox.io/s/misty-cookies-du1eg
这个代码基本上是可以使用的!如果你运行这段代码,你会发现 xs
被正确地打印了出来,但是 break
语句没有被执行,这导致了从子迭代器一直取值,从而抛出了在 c
中抛出的错误,最终导致一个未处理的 Promise rejection 错误。请注意,我们并没有对 Promise.all
的结果进行任何操作。理想情况下,当 finishP
Promise 完成时,应该返回生成器。我们只需要再加一点代码,以确保:1. 当父迭代器返回时返回子迭代器(例如,在 for await
循环中使用 break
语句),2. 当所有子迭代器都返回时返回父迭代器。
步骤3:当父迭代器返回时停止每个子迭代器,并在每个子迭代器返回时返回父迭代器。
为了确保每个子异步可迭代对象在父异步生成器返回时被正确返回,我们可以使用 finally 块来监听父异步生成器的完成情况。同时,为了确保子迭代器返回时父生成器被返回,我们可以将产生的 Promise 与 finishP
Promise 竞争。
async function *merge(iterables) {
const pushQueue = [];
const pullQueue = [];
function push(value) {
if (pullQueue.length) {
pullQueue.pop()(value);
} else {
pushQueue.unshift(value);
}
}
let stop;
const stopP = new Promise((resolve) => (stop = resolve));
let finished = false;
const finishP = Promise.all(
Array.from(iterables).map(async (iter) => {
iter = iter[Symbol.asyncIterator]();
try {
while (true) {
const result = await Promise.race([stopP, iter.next()]);
if (!result || result.done) {
return;
}
push(result.value);
}
} finally {
await iter.return && iter.return();
}
}),
).finally(() => (finished = true));
try {
while (!finished) {
if (pushQueue.length) {
yield pushQueue.pop();
} else {
const value = await Promise.race([
new Promise((resolve) => {
pullQueue.unshift(resolve);
}),
finishP,
]);
if (!finished) {
yield value;
}
}
}
await finishP;
} finally {
stop();
}
}
CodeSandbox链接:https://codesandbox.io/s/vigilant-leavitt-h247u
在此代码可用于生产之前,我们需要完成一些工作。例如,连续从子迭代器中提取值而不等待父迭代器提取它们。这与pushQueue
是无界数组的事实相结合,如果父迭代器以较慢的速度提取值,则可能会导致内存泄漏。
此外,合并迭代器返回undefined
作为其最终值,但您可能希望最终值是来自最后完成的子迭代器的最终值。
如果您正在寻找一个小而专注的库,其中具有类似上述合并函数的库,涵盖了更多用例和边缘情况,请查看我编写的Repeater.js。它定义了静态方法Repeater.merge
,该方法执行我上述描述的操作。它还提供了一个干净的API,用于将基于回调的API转换为承诺和其他组合器静态方法,以其他方式组合异步迭代器。
a
,b
和c
实际上以不同的速度运行,以便我们可以观察到预期的结果? - Bergibabel-node
来执行它。也许可以在meta上提一个话题? - sdgfsdh