如何将两个Riverpod StreamProvider组合在一起,其中一个流依赖于另一个流的数据?

6
有时我觉得我已经理解提供者(provider)的逻辑,但有时候我会因尝试以下操作而被卡住数小时。
我需要从Firestore集合流(collection stream)中获取连接ID的列表。容易。
但是,我需要将这个流式连接ID列表输入另一个Firestore集合流(collection stream)。如下所示,你可以看到我的upcomingEventsStreamProvider正在监听数据库和connectionsStream。在Riverpod或Firestore中没有抛出任何错误。然而,在日志(logs)中,我按以下顺序看到我的打印输出:
returned data
returned null stream value

我怎么滥用Riverpod提供程序的权限呢?哈哈。

final connectionsStreamProvider = StreamProvider<List<UidConnections>>((ref) {
  final database = ref.watch(databaseProvider);
  return database != null ? database.connectionsStream() : Stream.value(null);
});

final connectionsListStateProvider = StateProvider<List>((ref) => []);

final upcomingEventsStreamProvider = StreamProvider<List<SpecialEvents>>((ref) {
  final database = ref.watch(databaseProvider);
  final connectionsStream = ref.watch(connectionsStreamProvider);
  if (database != null && connectionsStream != null) {
    connectionsStream.whenData((data) {
      if (data != null) {
        data.forEach((event) {
          if (event?.active == true && event?.connectedUid != null) {
            ref
                .read(connectionsListStateProvider)
                .state
                .add(event.connectedUid);
          }
        });
        print('returned data');
        return database.upcomingSpecialEventsStream(
            ref.read(connectionsListStateProvider).state);
      }
    });
  }

  print('returned null stream value');
  return Stream.value(null);
});

或者,我只需要重构我的 Firebase Cloud Firestore 查询以先获取连接ID流吗?我更愿意使用 Riverpod,因为我仍然需要一个仅包含连接ID的流。


我不确定这是否能解决你的整个问题,但你应该添加一个返回语句:return connectionsStream.whenData... 因为你正在返回数据库的upcomingSpecialEventsStream,但没有从whenData中返回它。 - Alex Hartford
很遗憾,我尝试过了,但它只会返回AsyncValue。 - Zelf
没错,但应该是一个 AsyncValue<Stream>,你可以使用 AsyncValue<Stream>.when(data, loading, err) 来读取它。 - Alex Hartford
当添加返回值时,返回类型为AsyncValue<Null>。 - Zelf
我看到了 @double-beep。回答已发布。 - Zelf
2个回答

8

仍然困惑于如何只使用Riverpod来实现合并两个流。但我成功解决了我的问题,以下是给那些遇到一个流依赖另一个流数据的人的信息。

解决方法很好,并且我可以很好地使用rxdart Rx.combineLatest2。请见下文。现在,Riverpod很高兴,提供我的联合流的状态。我需要的连接ID现在是我的Account模型的一部分,感谢rxdart。希望这对某些人有所帮助。

final accountInfoStreamProvider = StreamProvider<Account>((ref) {
  final database = ref.watch(databaseProvider);
  return database != null
      ? AccountInfoModel(database: database).accountInfoStream()
      : Stream.value(null);
});

class AccountInfoModel {
  AccountInfoModel({@required this.database});
  final FirestoreDatabase database;

  Stream<Account> accountInfoStream() {
    return Rx.combineLatest2(
      database.accountStream(),
      database.connectionsStream(),
      (Account account, List<Connections> connections) {
        connections.forEach((connection) {
          account.connections.add(connection.documentId);
        });
        return account;
      },
    );
  }
}

0
不确定这是否符合您的要求,但这是一个有效的例子:
Stream<int> periodicStream() {
  return Stream.periodic(const Duration(seconds: 1), (i) {
    return i;
  });
}

Stream<int> simpleStream(int i) async* {
    yield i;
}

final streamProvider1 = StreamProvider((ref) => periodicStream());
final streamProvider2 = StreamProvider.family((ref, int i) => simpleStream(i));

final streamProvider3 = StreamProvider<int>((ref) async* {
  final value1 = await ref.watch(streamProvider1.future);
  final value2 = await ref.watch(streamProvider2(value1).future);
  yield value2;
});

你会使用streamProvider3,并使用ref.watch()来监听。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接