如何将多个流合并为一个更高级别的流?

56

我有两个流,Stream<A>Stream<B>。我有一个类型为C的构造函数,它接受一个A和一个B。如何将这两个Stream合并成一个Stream<C>

6个回答

53
import 'dart:async' show Stream;
import 'package:async/async.dart' show StreamGroup;

main() async {
  var s1 = stream(10);
  var s2 = stream(20);
  var s3 = StreamGroup.merge([s1, s2]);
  await for(int val in s3) {
    print(val);
  }
}

Stream<int> stream(int min) async* {
  int i = min;
  while(i < min + 10) {
    yield i++;
  }
}

参见 http://news.dartlang.org/2016/03/unboxing-packages-async-part-2.html

打印

10
20
11
21
12
22
13
23
14
24
15
25
16
26
17
27
18
28
19
29

3
我还没来得及查看那个库,但有一件事情我会小心谨慎,那就是单订阅流和它们缓存事件的能力。如果很多事件被送到一个流上而另一个流饿了,第一个流将不得不缓存事件,同时等待第二个流上的事件。 - Argenti Apparatus
1
或者他们暂停订阅。如果StreamController设置正确,它在暂停时不应该产生事件(根据用例可能不可能)。 - Günter Zöchbauer
1
假设如果需要进行此类处理,则可以获得有关A和B流事件到达的某些保证,并定义出发生错误的条件。 - Argenti Apparatus
1
根据目前的情况看来,@lrn对StreamZipper的利用使我只能要求Stream<A>Stream<B>上的更改数量完全相等并且在时间上匹配。这些流的数据源是Firebase数据库中的不同点,因此更新是在随机的时间点发生的,而且绝对不会以相同的数量出现。所以,实际上并不符合要求。无论如何,我还是接受它,因为它回答了提出的问题。 - Brett
3
StreamZip不需要A和B事件同时到达。当它从一个流接收到一个事件时,它会等待另一个流的事件。我不确定如果其中一个流关闭,StreamZip会做什么,但我认为它会停止监听其他流。然而,如果你的A和B事件没有真正地“配对”,那么我认为流合并的方法对你可能不适用。 - Argenti Apparatus
显示剩余5条评论

25

你可以使用 package:async 中的StreamZip 将两个流组合成一对流,然后从中创建 C 对象。

import "package:async" show StreamZip;
...
Stream<C> createCs(Stream<A> as, Stream<B> bs) =>
  new StreamZip([as, bs]).map((ab) => new C(ab[0], ab[1]));

23
请注意,StreamZip 只会发出成对的数据。如果输入流 A 和 B 的数据发射速率不同,C 会等待直到两个流都发射了一条数据后再将它们合并成一对数据进行发射。这可能符合人们的预期,也可能不符合。 - WSBT
1
我看不到其他的选择。在你同时拥有 A 和 B 之前,你无法创建一个新的 C 对象。除非每次收到 AB 时都创建一个新的 C,并使用另一个值的旧值。如果在第一个 B 之前收到两个 A,仍然会出现问题。 - lrn
1
@Irn combineLatest 通过等待每个流至少发出一个值来解决此问题:https://pub.dev/documentation/stream_transform/latest/stream_transform/CombineLatest/combineLatest.html - masterwok

22
如果需要在Stream<A>Stream<B>发出事件时做出反应,并使用两个流的最新值,则使用combineLatest
Stream<C> merge(Stream<A> streamA, Stream<B> streamB) {
  return streamA
    .combineLatest(streamB, (a, b) => new C(a, b));
}

10
只有在两个流至少都发出了一个事件时,它才会触发事件。因此,当只有一个流发出事件时,您将不会收到通知。有人可能会认为这是自然的解释,例如我只会得到A流的结果和B流的空值,但实际情况并非如此。 - scrimau
3
@scrimau 很好的建议。如果这是一个问题,您可能想要使用startWith - cambunctious

20

对于需要合并两个以上不同类型流并在任何一个流更新时获取所有最新值的人。

import 'package:stream_transform/stream_transform.dart';

Stream<List> combineLatest(Iterable<Stream> streams) {
  final Stream<Object> first = streams.first.cast<Object>();
  final List<Stream<Object>> others = [...streams.skip(1)];
  return first.combineLatestAll(others);
}

合并后的流将产生以下结果:
streamA:  a----b------------------c--------d---|
streamB:  --1---------2-----------------|
streamC:  -------&----------%---|
combined: -------b1&--b2&---b2%---c2%------d2%-|

为什么不使用StreamZip?因为StreamZip会产生以下问题:

streamA:  a----b------------------c--------d---|
streamB:  --1---------2-----------------|
streamC:  -------&----------%---|
combined: -------a1&-------b2%--|

使用方法:

Stream<T> sA;
Stream<K> sB;
Stream<Y> sC;
combineLatest([sA, sB, sC]).map((data) {
  T resA = data[0];
  K resB = data[1];
  Y resC = data[2];
  return D(resA, resB, resC);
});

2

使用rxdart,您可以使用CombineLatestStream来实现您想要的功能。请注意,新流在所有流至少发出一个事件之前不会返回任何值:

combineLatest-marbles

您可以使用CombineLatestStream.list()创建组合流:
import 'package:rxdart/rxdart.dart';

Stream<A> s1 = Stream.fromIterable([A()]);
Stream<B> s2 = Stream.fromIterable([B()]);
Stream<dynamic> s3 = CombineLatestStream.list<dynamic>([s1, s2])
  ..listen(
    (value) {
      A a = value[0];
      B b = value[1];
    },
  );

由于Dart不支持联合类型,CombineLatestStream.list()的一个缺点是来自不同类型流的事件之后应该进行转换(由于List<dynamic>)。另一种方法是使用CombineLatestStream.combine2()(例如使用创建Tuple2combiner)来保持类型。


1
要在第二个流从第一个流中获取结果时将两个流合并,请使用 asyncExpand
  Stream<UserModel?> getCurrentUserModelStream() {
    return FirebaseAuth.instance.authStateChanges().asyncExpand<UserModel?>(
      (currentUser) {
        if (currentUser == null) {
          return Stream.value(null);
        }
        return FirebaseFirestore.instance
            .collection('users')
            .doc(currentUser.uid)
            .snapshots()
            .map((doc) {
          final userData = doc.data();
          if (userData == null) {
            return null;
          }
          return UserModel.fromJson(userData);
        });
      },
    );
  }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接