RxJava - 等待Observable<List<Service>>中所有Observable<Boolean>完成

5

我将尝试创建一个管理器,监测 Obsevable<List> 中所有服务是否已经准备就绪。

这段代码的总体思路类似于下面的代码:

public class ServiceStuff {

    public interface Service {
        String getName();

        Observable<Boolean> monitorReady();
    }

    private BehaviorSubject<List<Service>> services = BehaviorSubject.createDefault(Collections.emptyList());

    ServiceStuff(List<Service> list) {
        services.onNext(list);
    }

    public void addService(Service service) {
        List<Service> newList = services.getValue();
        newList.add(service);
        services.onNext(newList);
    }

    public void removeService(Service service) {
        List<Service> newList = services.getValue();
        newList.remove(service);
        if (newList.remove(service)) {
            services.onNext(newList);
        }
    }

    public Observable<List<Service>> monitorServices() {
        return services.observeOn(AndroidSchedulers.mainThread());
    }

    public Observable<Boolean> monitorServicesReady() {
        return [If monitorReady() for all services return true, emit true, else false]
    }

}

如何在不阻塞UI线程的情况下实现monitorServicesReady()方法?应该可以在添加任何服务之前开始监视。


更新:

我现在尝试了cyroxis建议的解决方案,并进行了一些修改。 blockingIterable()会导致应用程序ANR,因为它永远不知道monitorServices()流何时结束。

public Observable<Boolean> monitorServicesReady() {
    Iterable<Observable<Boolean>> sources = monitorServices().flatMapIterable(x -> x).map(Service::monitorReady).blockingIterable();
    return Observable.combineLatest(sources, this::all);
}

private boolean all(Object[] values) {
    boolean result = true;
    for (boolean val : (Boolean[]) values) {
        result &= val;
    }
    return result;
}

你的意思是 monitorServicesReady() 不应该阻塞UI线程吗? - Saeed Masoumi
@SaeedMasoumi 是的,我已经更新了问题。我更倾向于不使用任何 blockingGet() 来解决这个问题 :) - MisseMask
“服务”列表可能会随时间变化而改变,对吗?这是为什么它在主题中的原因吗? - skywall
原来 combineLatest 需要 List<Observable<Boolean>>,而你的 combiner(例如 all)需要 Object[] 类型,不能使用 Boolean[],所以你必须在方法内部进行类型转换。 - cyroxis
@cyroxis 问题在于 combineLatest(sources, this::all) 中的第一个参数必须是 Iterable<Observable<Boolean>>。但是我不想使用 blockingIterable(),因为那会阻塞 UI 线程,对吧?或者调用 monitorServicesReady() 的人会注意到这一点吗? - MisseMask
显示剩余3条评论
2个回答

0

你需要完成两个步骤。

  1. 从你的“Service”对象中获取所有Observable<Boolean>的列表(或可观察对象)
  2. 对第一步得到的集合进行“combine latest”操作

代码示例:

private boolean all(Boolean[] values) {
  boolean result = true;
  for(boolean val : values) {
    result &= val;
  }
  return result;
}


public Observable<Boolean> monitorServicesReady() {
  Observable<Observable<Boolean>> sources = monitorServices.flatMapIterable(x -> x)
   .map(servie -> servies.monitorReady());

  return Observable.combineLatest(sources, this::all); 
}

注意:根据我的观察,您应该放弃使用BehaviorSubject,因为您的构造函数提供了一组固定的服务列表,只需将其保存在您的类中作为List即可。除非真正需要,否则应避免使用BehaviorSubject


我理解你的想法!只是似乎无法正确使用 combineLatest 的语法... - MisseMask
@Misklahr,你能否发布一个更新,说明你卡在哪里了吗? - cyroxis

0

你需要让你的服务也实现 Observable 接口。 然后你就可以使用 Merge 操作符,它也是一个 Observable,当所有服务都准备好时触发。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接