我该如何交错/合并异步可迭代对象?

33

假设我有一些像这样的异步可迭代对象:

const a = {
  [Symbol.asyncIterator]: async function * () {
    yield 'a';
    await sleep(1000);
    yield 'b';
    await sleep(2000);
    yield 'c';
  }, 
};

const b = {
  [Symbol.asyncIterator]: async function * () {
    await sleep(6000);
    yield 'i';
    yield 'j';
    await sleep(2000);
    yield 'k';
  }, 
};

const c = {
  [Symbol.asyncIterator]: async function * () {
    yield 'x';
    await sleep(2000);
    yield 'y';
    await sleep(8000);
    yield 'z';
    await sleep(10000);
    throw new Error('You have gone too far! ');
  }, 
};

为了完整起见:

// Promisified sleep function
const sleep = ms => new Promise((resolve, reject) => {
  setTimeout(() => resolve(ms), ms);
});

现在,假设我可以像这样连接它们:

const abcs = async function * () {
  yield * a;
  yield * b;
  yield * c;
};

前9个产生的项目将是:

(async () => {
  const limit = 9;
  let i = 0; 
  const xs = [];
  for await (const x of abcs()) {
    xs.push(x);
    i++;
    if (i === limit) {
      break;
    }
  }
  console.log(xs);
})().catch(error => console.error(error));

// [ 'a', 'b', 'c', 'i', 'j', 'k', 'x', 'y', 'z' ]

但是请想象一下,我并不关心顺序abc以不同的速度产生结果,我希望尽可能快地产生结果

如何重写此循环,以便x能够在最短时间内产生结果,而不考虑顺序?


abc还可能是无限序列,因此解决方案不能要求将所有元素缓存到数组中。


1
除了修复您当前的代码以使其可运行(如@T.J.Crowder建议的那样),您能否提供一个示例,其中abc实际上以不同的速度运行,以便我们可以观察到预期的结果? - Bergi
下次请确保您的示例可运行(我之前已经链接了这个,但那个礼貌的评论不知何故消失了:https://meta.stackoverflow.com/questions/358992/),因为A)它可以帮助您避免像这个问题两次发布不正确的代码,B)它可以让人们轻松地证明他们的解决方案是否有效(在发布之前自己测试,在发布后向他人展示)。祝编码愉快! - T.J. Crowder
@T.J.Crowder 我无法在SO上发布可工作的代码,因为我的浏览器不支持此语法。我只能在本地使用babel-node来执行它。也许可以在meta上提一个话题? - sdgfsdh
也许您想查看这个讨论 - Redu
我在npm上找到的最干净的解决方案在这里:https://github.com/tungv/async-generator/blob/master/packages/merge/index.js - polkovnikov.ph
显示剩余4条评论
7个回答

15

无法使用循环语句编写此内容。 async/await 代码总是按顺序执行,要并发执行操作,需要直接使用 Promise 组合器。对于普通的 Promise,有 Promise.all,对于异步迭代器还没有 (尚未) 相应的方法,因此我们需要自己编写:

async function* combine(iterable) {
    const asyncIterators = Array.from(iterable, o => o[Symbol.asyncIterator]());
    const results = [];
    let count = asyncIterators.length;
    const never = new Promise(() => {});
    function getNext(asyncIterator, index) {
        return asyncIterator.next().then(result => ({
            index,
            result,
        }));
    }
    const nextPromises = asyncIterators.map(getNext);
    try {
        while (count) {
            const {index, result} = await Promise.race(nextPromises);
            if (result.done) {
                nextPromises[index] = never;
                results[index] = result.value;
                count--;
            } else {
                nextPromises[index] = getNext(asyncIterators[index], index);
                yield result.value;
            }
        }
    } finally {
        for (const [index, iterator] of asyncIterators.entries())
            if (nextPromises[index] != never && iterator.return != null)
                iterator.return();
        // no await here - see https://github.com/tc39/proposal-async-iteration/issues/126
    }
    return results;
}
注意,combine不支持通过.throw或者.return传递值给next 或者取消操作。
你可以这样调用它:
(async () => {
  for await (const x of combine([a, b, c])) {
    console.log(x);
  }
})().catch(console.error);

1
@PatrickRoberts 是的,它收集涉及生成器函数的最终return值,类似于Promise.all的工作方式。虽然它并不经常使用,但您可以在async function*中使用console.log(yield* combine(…))来观察它。 - Bergi
1
我唯一需要添加的可能是一个 try/finally 子句,在突然完成的情况下关闭未完成的迭代器。 - Bergi
1
@brainkim 基本上,我们必须故意部署 Promise 构造函数反模式 和延迟模式。我们不再保留 getNext() 返回的 promise 数组,而是只安装两个处理程序,其中包含对当前竞赛解析器的可变引用:let resolve,reject; for (const [index, asyncIterator] of asyncIterators.entries()) asyncIterator.next().then(result => { resolve({result, index}); }, err => { reject(err); });const {index, result} = await new Promise((res, rej) => { resolve = res; reject = rej; }); - Bergi
1
@vitaly-t 简单地转译它 :-) 但实际上,唯一的 await 在那个 while 循环中,所以将其转换为递归方法相当容易。 - Bergi
1
@vitaly-t 它确实会跟踪它们 - 承诺被保存在 nextPromises 中,并在那里保持,直到它解决,即使来自其他迭代器的承诺提前实现。一旦其中一个承诺被拒绝,迭代器就会抛出该错误并关闭。 - Bergi
显示剩余22条评论

6
如果我将 abcs 更改为接受生成器进行处理,那么我会得到下面的代码,请参见内联注释:
const abcs = async function * (...gens) {
  // Worker function to queue up the next result
  const queueNext = async (e) => {
    e.result = null; // Release previous one as soon as possible
    e.result = await e.it.next();
    return e;
  };
  // Map the generators to source objects in a map, get and start their
  // first iteration
  const sources = new Map(gens.map(gen => [
    gen,
    queueNext({
      key: gen,
      it:  gen[Symbol.asyncIterator]()
    })
  ]));
  // While we still have any sources, race the current promise of
  // the sources we have left
  while (sources.size) {
    const winner = await Promise.race(sources.values());
    // Completed the sequence?
    if (winner.result.done) {
      // Yes, drop it from sources
      sources.delete(winner.key);
    } else {
      // No, grab the value to yield and queue up the next
      // Then yield the value
      const {value} = winner.result;
      sources.set(winner.key, queueNext(winner));
      yield value;
    }
  }
};

实时示例:

// Promisified sleep function
const sleep = ms => new Promise((resolve, reject) => {
  setTimeout(() => resolve(ms), ms);
});

const a = {
  [Symbol.asyncIterator]: async function * () {
    yield 'a';
    await sleep(1000);
    yield 'b';
    await sleep(2000);
    yield 'c';
  }, 
};

const b = {
  [Symbol.asyncIterator]: async function * () {
    await sleep(6000);
    yield 'i';
    yield 'j';
    await sleep(2000);
    yield 'k';
  }, 
};

const c = {
  [Symbol.asyncIterator]: async function * () {
    yield 'x';
    await sleep(2000);
    yield 'y';
    await sleep(8000);
    yield 'z';
  }, 
};

const abcs = async function * (...gens) {
  // Worker function to queue up the next result
  const queueNext = async (e) => {
    e.result = null; // Release previous one as soon as possible
    e.result = await e.it.next();
    return e;
  };
  // Map the generators to source objects in a map, get and start their
  // first iteration
  const sources = new Map(gens.map(gen => [
    gen,
    queueNext({
      key: gen,
      it:  gen[Symbol.asyncIterator]()
    })
  ]));
  // While we still have any sources, race the current promise of
  // the sources we have left
  while (sources.size) {
    const winner = await Promise.race(sources.values());
    // Completed the sequence?
    if (winner.result.done) {
      // Yes, drop it from sources
      sources.delete(winner.key);
    } else {
      // No, grab the value to yield and queue up the next
      // Then yield the value
      const {value} = winner.result;
      sources.set(winner.key, queueNext(winner));
      yield value;
    }
  }
};

(async () => {
  console.log("start");
  for await (const x of abcs(a, b, c)) {
    console.log(x);
  }
  console.log("done");
})().catch(error => console.error(error));
.as-console-wrapper {
  max-height: 100% !important;
}


1
不错!我认为你可以简化代码,只需使用 Promise 的 Map,使用 sources.set(winner.key, queueNext(winner)) 替代 winner.next =。这样你就不需要在 race 调用中使用 map,而且没有了 next 字段,sources 初始化也变得更短。 - Bergi
@Bergi - 你是对的。等我终于弄明白了这个问题,我已经超出了自己规定的时间预算。 :-) 所以既然它可行,我就停了下来。但是...没错,你是对的,我编辑了一下,现在运作得很好。 - T.J. Crowder
@Bergi - 哈哈,说得好,一旦我摆脱了map,我就不再需要在Promise.race中使用数组了。 :-) 我已经采纳了你的一些修改。我更喜欢在承诺挂起时将result设置为null,提前释放上一个对象... - T.J. Crowder
@T.J.Crowder 我加入了我的看法。很希望能够得到我的答案的审查。 - Ben

2

如果有人发现有用的话,这里是目前被接受的答案的typescript版本:


const combineAsyncIterables = async function* <T>(
  asyncIterables: AsyncIterable<T>[],
): AsyncGenerator<T> {
  const asyncIterators = Array.from(asyncIterables, (o) =>
    o[Symbol.asyncIterator](),
  );
  const results = [];
  let count = asyncIterators.length;
  const never: Promise<never> = new Promise(noOp);
  const getNext = (asyncIterator: AsyncIterator<T>, index: number) =>
    asyncIterator.next().then((result) => ({ index, result }));

  const nextPromises = asyncIterators.map(getNext);
  try {
    while (count) {
      const { index, result } = await Promise.race(nextPromises);
      if (result.done) {
        nextPromises[index] = never;
        results[index] = result.value;
        count--;
      } else {
        nextPromises[index] = getNext(asyncIterators[index], index);
        yield result.value;
      }
    }
  } finally {
    for (const [index, iterator] of asyncIterators.entries()) {
      if (nextPromises[index] != never && iterator.return != null) {
        // no await here - see https://github.com/tc39/proposal-async-iteration/issues/126
        void iterator.return();
      }
    }
  }
  return results;
}; 

2

这是一个复杂的任务,因此我将其分解为几个部分:

步骤1:将每个异步可迭代对象的值记录到控制台

在考虑创建异步迭代器之前,我们应该首先考虑简单地将每个迭代器的值记录到控制台。与javascript中的大多数并发任务一样,这涉及调用多个异步函数并使用Promise.all等待它们的结果。

function merge(iterables) {
  return Promise.all(
    Array.from(iterables).map(async (iter) => {
      for await (const value of iter) {
        console.log(value);
      }
    }),
  );
}

// a, b and c are the async iterables defined in the question
merge([a, b, c]); // a, x, b, y, c, i, j, k, z, Error: you have gone too far!

CodeSandbox链接: https://codesandbox.io/s/tender-ives-4hijy?fontsize=14

merge函数会从每个迭代器中记录值,但它几乎没有用处; 当所有迭代器都完成时,它返回一个解决为undefined数组的承诺。

第二步: 用合并异步生成器替换合并函数

下一步是用调用父异步迭代器的推送函数调用替换console.log。为此使用异步生成器需要更多代码,因为唯一将值“推送”到异步生成器的方法是使用yield运算符,而该运算符无法在子函数范围内使用。解决方案是创建两个队列,即推送队列和拉取队列。接下来,我们定义一个push函数,如果没有未处理的拉取,则将其推入推送队列,否则将一个值排队等待稍后拉取。最后,我们必须持续地产生来自推送队列的值或者是由push稍后调用的resolve函数所排队的承诺。以下是代码:

async function *merge(iterables) {
  // pushQueue and pullQueue will never both contain values at the same time.
  const pushQueue = [];
  const pullQueue = [];
  function push(value) {
    if (pullQueue.length) {
      pullQueue.pop()(value);
    } else {
      pushQueue.unshift(value);
    }
  }

  // the merge code from step 1
  const finishP = Promise.all(
    Array.from(iterables).map(async (iter) => {
      for await (const value of iter) {
        push(value);
      }
    }),
  );

  while (true) {
    if (pushQueue.length) {
      yield pushQueue.pop();
    } else {
      // important to note that yield in an async generator implicitly awaits promises.
      yield new Promise((resolve) => {
        pullQueue.unshift(resolve);
      });
    }
  }
}

// code from the question
(async () => {
  const limit = 9;
  let i = 0; 
  const xs = [];
  for await (const x of merge([a, b, c])) {
    xs.push(x);
    console.log(x);
    i++;
    if (i === limit) {
      break;
    }
  }
  console.log(xs); // ["a", "x", "b", "y", "c", "i", "j", "k", "z"]
})().catch(error => console.error(error));

CodeSandbox链接:https://codesandbox.io/s/misty-cookies-du1eg

这个代码基本上是可以使用的!如果你运行这段代码,你会发现 xs 被正确地打印了出来,但是 break 语句没有被执行,这导致了从子迭代器一直取值,从而抛出了在 c 中抛出的错误,最终导致一个未处理的 Promise rejection 错误。请注意,我们并没有对 Promise.all 的结果进行任何操作。理想情况下,当 finishP Promise 完成时,应该返回生成器。我们只需要再加一点代码,以确保:1. 当父迭代器返回时返回子迭代器(例如,在 for await 循环中使用 break 语句),2. 当所有子迭代器都返回时返回父迭代器。

步骤3:当父迭代器返回时停止每个子迭代器,并在每个子迭代器返回时返回父迭代器。

为了确保每个子异步可迭代对象在父异步生成器返回时被正确返回,我们可以使用 finally 块来监听父异步生成器的完成情况。同时,为了确保子迭代器返回时父生成器被返回,我们可以将产生的 Promise 与 finishP Promise 竞争。

async function *merge(iterables) {
  const pushQueue = [];
  const pullQueue = [];
  function push(value) {
    if (pullQueue.length) {
      pullQueue.pop()(value);
    } else {
      pushQueue.unshift(value);
    }
  }
  // we create a promise to race calls to iter.next
  let stop;
  const stopP = new Promise((resolve) => (stop = resolve));
  let finished = false;
  const finishP = Promise.all(
    Array.from(iterables).map(async (iter) => {
      // we use the iterator interface rather than the iterable interface
      iter = iter[Symbol.asyncIterator]();
      try {
        while (true) {
          // because we can’t race promises with for await, we have to call iter.next manually
          const result = await Promise.race([stopP, iter.next()]);
          if (!result || result.done) {
            return;
          }
          push(result.value);
        }
      } finally {
        // we should be a good citizen and return child iterators
        await iter.return && iter.return();
      }
    }),
  ).finally(() => (finished = true));

  try {
    while (!finished) {
      if (pushQueue.length) {
        yield pushQueue.pop();
      } else {
        const value = await Promise.race([
          new Promise((resolve) => {
            pullQueue.unshift(resolve);
          }),
          finishP,
        ]);
        if (!finished) {
          yield value;
        }
      }
    }

    // we await finishP to make the iterator catch any promise rejections
    await finishP;
  } finally {
    stop();
  }
}

CodeSandbox链接:https://codesandbox.io/s/vigilant-leavitt-h247u

在此代码可用于生产之前,我们需要完成一些工作。例如,连续从子迭代器中提取值而不等待父迭代器提取它们。这与pushQueue是无界数组的事实相结合,如果父迭代器以较慢的速度提取值,则可能会导致内存泄漏。

此外,合并迭代器返回undefined作为其最终值,但您可能希望最终值是来自最后完成的子迭代器的最终值。

如果您正在寻找一个小而专注的库,其中具有类似上述合并函数的库,涵盖了更多用例和边缘情况,请查看我编写的Repeater.js。它定义了静态方法Repeater.merge,该方法执行我上述描述的操作。它还提供了一个干净的API,用于将基于回调的API转换为承诺和其他组合器静态方法,以其他方式组合异步迭代器。


0

我使用异步生成器解决了这个问题。(要是我几天前就找到这个问题就好了,能省点时间) 非常乐意听取意见和批评。

async function* mergen(...gens) {
  const promises = gens.map((gen, index) =>
    gen.next().then(p => ({...p, gen}))
  );

  while (promises.length > 0) {
    yield race(promises).then(({index, value: {value, done, gen}}) => {
      promises.splice(index, 1);
      if (!done)
        promises.push(
          gen.next().then(({value: newVal, done: newDone}) => ({
            value: newVal,
            done: newDone,
            gen
          }))
        );
      return value;
    });
  }
};

// Needed to implement race to provide index of resolved promise
function race(promises) {
  return new Promise(resolve =>
    promises.forEach((p, index) => {
      p.then(value => {
        resolve({index, value});
      });
    })
  );
}

我花了很多时间才找到它,而且我非常兴奋,所以我把它放在了一个npm包中 :) https://www.npmjs.com/package/mergen


你的npm包中的“Usage”似乎与实际用法不符。例如:const {mergen} = require('mergen.js') -> const mergen = require('mergen') - Patrick Roberts
{...p, gen} 的结果会有一个名为 gen 的值吗? - sdgfsdh
@Bergi 我一开始也是这么想的,但是一旦你开始删除元素,它就不起作用了。此时存储在承诺中的索引失去了所有意义。 - Ben
@Ben 是的,你将不得不在每次迭代中执行它(就像你当前的“race”实现一样),而不是在“while”循环之前。 - Bergi
1
假设您有N个生成器,总共生成M个承诺;这个算法是O(N * M),理想情况下应该是O(M)。如果数组中的第0个承诺每次都赢得比赛,您将调用promises.splice(0,1)(这是O(N)),并在每次承诺完成时将新承诺推到末尾。相反,使用Map,例如此答案,可以使删除承诺为O(1)。 - Dan Fabulich
显示剩余2条评论

0

解决方案:IxJS

我们可以使用JavaScript的交互式扩展(IxJS)文档)来轻松实现:

import { merge } from 'ix/asynciterable'

const d = merge(a, b, c)

for await (const i of d) {
  console.info('merged:', i)
}

将会得到结果:

$ ./src/t.ts 
merged a
merged x
merged b
merged y
merged c
merged i
merged j
merged k
merged z
Error: You have gone too far! 
    at Object.[Symbol.asyncIterator]

完整代码示例

const sleep = ms => new Promise((resolve) => {
  setTimeout(() => resolve(ms), ms);
});

const a = {
  [Symbol.asyncIterator]: async function * () {
    yield 'a';
    await sleep(1000);
    yield 'b';
    await sleep(2000);
    yield 'c';
  },
};

const b = {
  [Symbol.asyncIterator]: async function * () {
    await sleep(6000);
    yield 'i';
    yield 'j';
    await sleep(2000);
    yield 'k';
  },
};

const c = {
  [Symbol.asyncIterator]: async function * () {
    yield 'x';
    await sleep(2000);
    yield 'y';
    await sleep(8000);
    yield 'z';
    await sleep(10000);
    throw new Error('You have gone too far! ');
  },
};

const d = IxAsynciterable.merge(a, b, c)

async function main () {
  for await (const i of d) {
    console.info('merged', i)
  }
}

main().catch(console.error)
<script src="https://unpkg.com/ix@4.5.2/Ix.dom.asynciterable.es2015.min.js"></script>


-3

我希望我正确理解了你的问题,这是我的解决方案:

let results = [];

Promise.all([ a, b, c ].map(async function(source) {
    for await (let item of source) {
        results.push(item);
    }
}))
.then(() => console.log(results));

我用了三个普通数组试了一下:

var a = [ 1, 2, 3 ];
var b = [ 4, 5, 6 ];
var c = [ 7, 8, 9 ];

结果是[1, 4, 7, 2, 5, 8, 3, 6, 9]


这是一个不错的尝试,你的想法是正确的。然而,如果 abc 永远不终止,那么它就无法工作,这可能是情况。我会更新问题以澄清这一点。 - sdgfsdh
嗯,那这是不是有点像 socket_select() 的情况?你有一堆可能是无限的来源,而你总是想要获取下一个可用的值? - Máté Safranka
是的,与数组不同,可迭代对象可能永远不会结束。这是有效的:async function * () { while (true) { yield 0; } } - sdgfsdh

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接