一个AsyncIterable和一个Observable之间的实际区别是什么?

10

最近我一直在思考这个话题。似乎AsyncIterables和Observables都具有类似于流的特性,尽管它们的消费方式略有不同。

你可以像这样使用Async Iterable进行消费

const myAsyncIterable = async function*() { yield 1; yield 2; yield 3; }

const main = async () => {
  for await (const number of myAsyncIterable()) {
    console.log(number)
  }
}

main()

你可以这样使用可观察对象

const Observable = rxjs
const { map } = rxjs.operators

Observable.of(1, 2, 3).subscribe(x => console.log(x))
<script src="https://unpkg.com/rxjs/bundles/rxjs.umd.min.js"></script>

我的主要问题基于这个 RxJS pull request

如果可观察对象发出的速度比循环完成的速度快,那么随着缓冲区变得更加充满,就会出现内存累积。我们可以提供使用不同策略的其他方法(例如,只使用最近的值等),但将其保留为默认设置。请注意,循环本身可能有几个等待,这会加剧问题。

我觉得异步迭代器本质上没有背压问题,因此在Observable上实现 Symbol.asyncIterator (@@asyncIterator) 并默认采用背压策略是否正确?在异步可迭代对象的情况下,甚至是否需要Observable?

理想情况下,您可以通过代码示例展示AsyncIterables和Observables之间的实际区别。


@Bergi 特别是,允许内存积累随着缓冲区变得更满的背压策略(正如贡献者所暗示的那样)。不,这对我来说听起来不像是一个好策略。此外,我可能已经回答了我的问题,但我正在寻找更详细的解释。 - richytong
如果不可能将Observables视为拉取源,那么将Observables设置为异步可迭代对象是否正确?我还想指出,供参考,Observables是一个stage 1 proposal - richytong
1
“我们需要在规范中使用Observable吗?”这是一个基于观点的问题,我们无法在此回答。但您所链接的提案文本应该对此有充分的论述和论据。 - Bergi
“异步可迭代对象能做到与可观察对象一样的事情吗?” - 不行。它们需要一个消费者来产生值。嗯,另一方面,您链接的提案中的可观察对象似乎表现相同。 - Bergi
1
非常接近(迭代器 vs 可迭代对象)的重复问题:异步生成器和可观察对象有什么区别? - Bergi
显示剩余5条评论
3个回答

11
主要区别在于决定何时迭代的一方不同。
在Async Iterators中,客户端通过调用 await iterator.next() 来决定。源头决定何时解析 promise,但客户端必须先请求下一个值。因此,消费者从源头“拉取”数据。
Observables注册一个回调函数,当新值进来时可立即由Observable进行调用。因此,源头向消费者“推送”。
Observable可以很容易地通过使用Subject并将其映射到异步迭代器的下一个值来使用。然后,每当准备好消耗下一个项时,您就会调用主题上的 next。以下是代码示例。
const pull = new Subject();
const output = pull.pipe(
  concatMap(() => from(iter.next())),
  map(val => { 
    if(val.done) pull.complete();
    return val.value;
  })
);
//wherever you need this 
output.pipe(

).subscribe(() => {
  //we're ready for the next item
  if(!pull.closed) pull.next();
});


从消费者的角度来看,Observable 有什么能力是 AsyncIterable 无法做到的? - richytong
我正在寻找一些实用的东西,带有代码示例。 - richytong

0

可观察的东西令人难以置信,而我的理解可能有缺陷。但是异步迭代器只是一个返回承诺的迭代器,这些承诺可以解决未来事件的问题,并在“实时”事件流(热可观察对象)中触发。它可以使用队列来实现,如下所示。

function* iterateClickEvents(target) {
  const queue = []
  target.addEventListener('click', e => queue.shift()?.fulfill(e))
  while (true)
    yield new Promise(fulfill => queue.push({fulfill}))
}

//use it
for await (const e of iterateClickEvents(myButton))
  handleEvent(e)

接着您可以实现像这样的流畅操作符:

class FluentIterable {
  constructor(iterable) {
    this.iterable = iterable
  }
  filter(predicate) {
    return new FluentIterable(this.$filter(predicate))
  }
  async* $filter(predicate) {
    for await (const value of this.iterable)
      if (predicate(value))
        yield value
  }
  async each(fn) {
    for await (const value of this.iterable)
      fn(value)
  }
}

//use it
new FluentIterable(iterateClickEvents(document.body))
  .filter(e => e.target == myButton)
  .each(handleEvent)
  .catch(console.error)

https://codepen.io/ken107/pen/PojZjgB

你可以实现一个 map 操作符,返回内部迭代器的结果。从那时起,事情变得复杂了。

0

是当前的实现Observable[Symbol.asyncIterator]

下面是一个在数组上实现Symbol.asyncIterator的基本示例:

const dummyPromise = (val, time) => new Promise(res => setTimeout(res, time * 1000, val));

const items = [1, 2, 3];

items[Symbol.asyncIterator] = async function * () {
  yield * await this.map(v => dummyPromise(v, v));
}

!(async () => {
  for await (const value of items) {

  console.log(value);
}
})();
/* 
1 - after 1s
2 - after 2s
3 - after 3s
*/

我理解生成器(同步生成器)的方式是它们是可暂停的函数,这意味着你可以立即请求一个值,然后在10秒后请求另一个值。异步生成器遵循相同的方法,只是它们产生的值是异步的,这意味着你必须等待它。await
例如:
const dummyPromise = (val, time) => new Promise(res => setTimeout(res, time * 1000, val));

const items = [1, 2, 3];
items[Symbol.asyncIterator] = async function * () {
  yield * await this.map(v => dummyPromise(v, v));
}

const it = items[Symbol.asyncIterator]();

(async () => {
  // console.log(await it.next())
  await it.next();

  setTimeout(async () => {
    console.log(await it.next());
  }, 2000); // It will take 4s in total
})();

回到Observable的实现:
async function* coroutine<T>(source: Observable<T>) {
  const deferreds: Deferred<IteratorResult<T>>[] = [];
  const values: T[] = [];
  let hasError = false;
  let error: any = null;
  let completed = false;

  const subs = source.subscribe({
    next: value => {
      if (deferreds.length > 0) {
        deferreds.shift()!.resolve({ value, done: false });
      } else {
        values.push(value);
      }
    },
    error: err => { /* ... */ },
    complete: () => { /* ... */ },
  });

  try {
    while (true) {
      if (values.length > 0) {
        yield values.shift();
      } else if (completed) {
        return;
      } else if (hasError) {
        throw error;
      } else {
        const d = new Deferred<IteratorResult<T>>();
        deferreds.push(d);
        const result = await d.promise;
        if (result.done) {
          return;
        } else {
          yield result.value;
        }
      }
    }
  } catch (err) {
    throw err;
  } finally {
    subs.unsubscribe();
  }
}

根据我的理解:

  • values 用于跟踪同步值 如果你有of(1, 2, 3),那么在它甚至到达while(true) { }之前,values数组将包含[1, 2, 3]。因为你正在使用for await (const v of ...), 所以你会像执行it.next(); it.next(); it.next() ...一样请求值。

    换句话说,一旦你可以从迭代器中消耗一个值,你就立即请求下一个值,直到数据生产者没有更多可提供的值为止。

  • deferreds 用于异步值 因此,在第一个it.next()时,values数组为空(意味着observable没有同步发出),因此它将回退到最后一个else,简单地创建一个promise并将其添加到deferreds中,然后等待该promise resolvereject

    当observable最终发出时,deferreds不会为空,因此等待的promise将resolve为新到达的值。

const src$ = merge(
  timer(1000).pipe(mapTo(1)),
  timer(2000).pipe(mapTo(2)),
  timer(3000).pipe(mapTo(3)),
);

!(async () => {
  for await (const value of src$) {
    console.log(value);
  }
})();

StackBlitz

{{链接1:StackBlitz}}


当你已经在使用一个 Promise 队列时,自己实现异步迭代器接口比使用异步生成器函数更容易(也更有效)。 - Bergi
有趣!你的回答让我想起了数据加载器。谢谢。我个人觉得这种方法更容易理解,尽管可能不是最有效的方法。 - Andrei Gătej

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接