Rxjava Android如何使用Zip操作符

62
我在我的 Android 项目中使用 RxJava 遇到了很多理解 zip 运算符的困难。
问题: 我需要发送网络请求上传视频,然后需要发送另一个网络请求来上传与之相配的图片;最后,我需要添加描述,并使用前两个请求的响应将视频和图片的位置 URL 以及描述上传到我的服务器。
我认为 zip 运算符非常适合这个任务,因为我可以使用两个可观察对象(视频和图片请求)的响应完成我的最终任务。但是实现起来似乎有些棘手。
我想请人用一些伪代码概念性地回答如何完成这个任务。谢谢!
8个回答

75

Zip操作符会严格地将可观察对象发出的项配对。它会等待两个(或更多)项到达,然后将它们合并。因此,这对您的需求是合适的。

我会使用Func2将前两个可观察对象的结果链接起来。 请注意,如果使用Retrofit,此方法会更简单,因为其API接口可能会返回一个可观察对象。否则,您需要创建自己的可观察对象。

// assuming each observable returns response in the form of String
Observable<String> movOb = Observable.create(...);
// if you use Retrofit
Observable<String> picOb = RetrofitApiManager.getService().uploadPic(...),
Observable.zip(movOb, picOb, new Func2<String, String, MyResult>() {
      @Override
      public MyResult call(String movieUploadResponse, String picUploadResponse) {
          // analyze both responses, upload them to another server
          // and return this method with a MyResult type
          return myResult;
      }
   }
)
// continue chaining this observable with subscriber
// or use it for something else

我遇到的问题是,我有一个视图,我在其中触发了video observable的任务,还有另一个视图,用于picture observable,而另一个仍然需要将这两个结果一起使用作为最终observable。zip方法是否会返回已执行的observable的结果? - feilong
我认为你不能执行一个observable,但我理解你的意思。Observable不会运行,直到有订阅者附加到它上面。因此,在附加订阅者之前,整个observable安排必须完成。 - inmyth
1
我同意你的观点,即“zip操作符允许您从两个不同的可观察结果中组合一个结果”。现在我想让observable 2依赖于observable 1,因此observable 1应该在observable 2之前执行,然后我需要将两个observable的结果组合起来。我们有任何操作符可以做到这一点吗?zip能完成这项工作吗?我不想使用flatMap,因为它会将一个流转换为另一个流,但是在这里我需要设置依赖关系,然后压缩结果。请回复。 - Sagar Trehan

37

一个小例子:

val observableOne = Observable.just("Hello", "World")
val observableTwo = Observable.just("Bye", "Friends")
val zipper = BiFunction<String, String, String> { first, second -> "$first - $second" }
Observable.zip(observableOne, observableTwo, zipper)
  .subscribe { println(it) }

这将打印:

Hello - Bye
World - Friends

BiFunction<String, String, String>表示第一个String是第一个observable的类型,第二个String是第二个observable的类型,第三个String代表你的zipper函数返回的类型。


我在这篇博客文章中制作了一个小例子,使用zip调用了两个真实的端点。


Func2<...> 中,这三个字符串是什么?是为了 s-s2 吗? - chornge
1
我刚刚更新了答案以处理RxJava2方法,BiFunction中的第一个String是第一个observable的类型,第二个String是第二个observable的类型,第三个String是它将生成的observable的类型! - Evin1_

4

我这里有一个使用异步方式完成的Zip例子,以防你感到好奇

      /**
 * Since every observable into the zip is created to subscribeOn a diferent thread, it´s means all of them will run in parallel.
 * By default Rx is not async, only if you explicitly use subscribeOn.
  */
@Test
public void testAsyncZip() {
    scheduler = Schedulers.newThread();
    scheduler1 = Schedulers.newThread();
    scheduler2 = Schedulers.newThread();
    long start = System.currentTimeMillis();
    Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2)
                                                                                        .concat(s3))
              .subscribe(result -> showResult("Async in:", start, result));
}

/**
 * In this example the the three observables will be emitted sequentially and the three items will be passed to the pipeline
 */
@Test
public void testZip() {
    long start = System.currentTimeMillis();
    Observable.zip(obString(), obString1(), obString2(), (s, s2, s3) -> s.concat(s2)
                                                                         .concat(s3))
              .subscribe(result -> showResult("Sync in:", start, result));
}


public void showResult(String transactionType, long start, String result) {
    System.out.println(result + " " +
                               transactionType + String.valueOf(System.currentTimeMillis() - start));
}

public Observable<String> obString() {
    return Observable.just("")
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> "Hello");
}

public Observable<String> obString1() {
    return Observable.just("")
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> " World");
}

public Observable<String> obString2() {
    return Observable.just("")
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> "!");
}

public Observable<String> obAsyncString() {
    return Observable.just("")
                     .observeOn(scheduler)
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> "Hello");
}

public Observable<String> obAsyncString1() {
    return Observable.just("")
                     .observeOn(scheduler1)
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> " World");
}

public Observable<String> obAsyncString2() {
    return Observable.just("")
                     .observeOn(scheduler2)
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> "!");
}

您可以在此处查看更多示例:https://github.com/politrons/reactive

应该也要发布结果。 - Prakhar Kulshreshtha

2

zip 操作符允许您从两个不同的可观察对象的结果中组合一个结果。

您需要提供一个 lambda 函数,该函数将根据每个可观察对象发出的数据创建一个结果。

Observable<MovieResponse> movies = ...
Observable<PictureResponse> picture = ...

Observable<Response> response = movies.zipWith(picture, (movie, pic) -> {
        return new Response("description", movie.getName(), pic.getUrl());

});

我同意你的观点,即“zip操作符允许您从两个不同的可观察结果中组合一个结果”。现在我想让observable 2依赖于observable 1,因此observable 1应该在observable 2之前执行,然后我需要将两个observable的结果组合起来。我们有任何操作符可以做到这一点吗?zip能完成这项工作吗?我不想使用flatMap,因为它会将一个流转换为另一个流,但是在这里我需要设置依赖关系,然后压缩结果。请回复。 - Sagar Trehan

2

这是我的实现,使用了 Single.ziprxJava2

我尽可能地让它易于理解

//
// API Client Interface
//
@GET(ServicesConstants.API_PREFIX + "questions/{id}/")
Single<Response<ResponseGeneric<List<ResponseQuestion>>>> getBaseQuestions(@Path("id") int personId);

@GET(ServicesConstants.API_PREFIX + "physician/{id}/")
Single<Response<ResponseGeneric<List<ResponsePhysician>>>> getPhysicianInfo(@Path("id") int personId);

//
// API middle layer - NOTE: I had feedback that the Single.create is not needed (but I haven't yet spent the time to improve it)
//
public Single<List<ResponsePhysician>> getPhysicianInfo(int personId) {
    return Single.create(subscriber -> {
        apiClient.getPhysicianInfo(appId)
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .subscribe(response -> {
                    ResponseGeneric<List<ResponsePhysician>> responseBody = response.body();
                    if(responseBody != null && responseBody.statusCode == 1) {
                        if (!subscriber.isDisposed()) subscriber.onSuccess(responseBody.data);
                    } else if(response.body() != null && response.body().status != null ){
                        if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.body().status));
                    } else {
                        if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.message()));
                    }
                }, throwable -> {
                    throwable.printStackTrace();
                    if(!subscriber.isDisposed()) subscriber.onError(throwable);
                });
    });
}

public Single<List<ResponseQuestion>> getHealthQuestions(int personId){
    return Single.create(subscriber -> {
        apiClient.getBaseQuestions(personId)
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .subscribe(response -> {
                    ResponseGeneric<List<ResponseQuestion>> responseBody = response.body();
                    if(responseBody != null && responseBody.data != null) {
                        if (!subscriber.isDisposed()) subscriber.onSuccess(response.body().data);
                    } else if(response.body() != null && response.body().status != null ){
                        if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.body().status));
                    } else {
                        if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.message()));
                    }
                }, throwable -> {
                    throwable.printStackTrace();
                    if(!subscriber.isDisposed()) subscriber.onError(throwable);
                });
    });
}

//please note that ResponseGeneric is just an outer wrapper of the returned data - common to all API's in this project

public class ResponseGeneric<T> {

    @SerializedName("Status")
    public String status;

    @SerializedName("StatusCode")
    public float statusCode;

    @SerializedName("Data")
    public T data;
}

//
// API end-use layer - this gets close to the UI so notice the oberver is set for main thread
//
private static class MergedResponse{// this is just a POJO to store all the responses in one object
    public List<ResponseQuestion> listQuestions;
    public List<ResponsePhysician> listPhysicians;
    public MergedResponse(List<ResponseQuestion> listQuestions, List<ResponsePhysician> listPhysicians){
        this.listQuestions = listQuestions;
        this.listPhysicians = listPhysicians;
    }
}

// example of Single.zip() - calls getHealthQuestions() and getPhysicianInfo() from API Middle Layer
private void downloadHealthQuestions(int personId) {
    addRxSubscription(Single
            .zip(getHealthQuestions(personId), getPhysicianInfo(personId), MergedResponse::new)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(response -> {
                if(response != null) {
                    Timber.i(" - total health questions downloaded %d", response.listQuestions.size());
                    Timber.i(" - physicians downloaded %d", response.listPhysicians.size());

                    if (response.listPhysicians != null && response.listPhysicians.size()>0) {
                        // do your stuff to process response data
                    }

                    if (response.listQuestions != null && response.listQuestions.size()>0) {

                        // do your stuff to process response data
                    }


                } else {
                    // process error - show message
                }
            }, error -> {
                // process error - show network error message
            }));
}

2

我一直在寻找一个简单的答案,关于如何使用Zip操作符以及如何处理创建的Observables以将它们传递给Zip操作符。我想知道是否应该为每个Observable调用subscribe()方法。但是没有一个简单的答案可以找到,我不得不自己摸索。下面是一个使用Zip操作符处理两个Observables的简单示例:

@Test
public void zipOperator() throws Exception {

    List<Integer> indexes = Arrays.asList(0, 1, 2, 3, 4);
    List<String> letters = Arrays.asList("a", "b", "c", "d", "e");

    Observable<Integer> indexesObservable = Observable.fromIterable(indexes);

    Observable<String> lettersObservable = Observable.fromIterable(letters);

    Observable.zip(indexesObservable, lettersObservable, mergeEmittedItems())
            .subscribe(printMergedItems());
}

@NonNull
private BiFunction<Integer, String, String> mergeEmittedItems() {
    return new BiFunction<Integer, String, String>() {
        @Override
        public String apply(Integer index, String letter) throws Exception {
            return "[" + index + "] " + letter;
        }
    };
}

@NonNull
private Consumer<String> printMergedItems() {
    return new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    };
}

打印结果为:
[0] a
[1] b
[2] c
[3] d
[4] e

我脑海中的问题的最终答案如下:
传递给zip()方法的Observables只需要创建即可,它们不需要任何订阅者,仅仅创建就足够了......如果你想让任何Observable在调度程序上运行,可以为该Observable指定这个功能......我还尝试在Observables上使用zip()操作符,它们应该等待结果,当两个结果都准备好时,zip()的消费者才会被触发(这是预期的行为)。

我有两个Observable列表(http请求)。第一个列表包含创建项目的请求,第二个列表包含更新项目的请求。当所有请求都结束时,我想执行“某些操作”。我想使用zip,但是通过您的示例,我发现它是将请求成对分组,而我只想在结束时得到通知(这两个列表的大小不同)。你能给我一些建议吗?谢谢。 - JCarlosR
1
请查看以下内容:http://reactivex.io/documentation/operators/combinelatest.html - Ahmed Adel Ismail

1

你使用 Java 8 中的 rxjavazip

Observable<MovieResponse> movies = ...
Observable<PictureResponse> picture = ...

Observable<ZipResponse> response = Observable.zip(movies, picture, ZipResponse::new);

class ZipResponse {
        private MovieResponse movieResponse;
        private PictureResponse pictureResponse;

        ZipResponse(MovieResponse movieResponse, PictureResponse pictureResponse) {
             this.movieResponse = movieResponse;
             this.pictureResponse = pictureResponse;
        }

        public MovieResponse getMovieResponse() {
             return movieResponse;
        }

        public void setMovieResponse(MovieResponse movieResponse) {
            this.movieResponse= movieResponse;
        }

        public PictureResponse getPictureResponse() {
             return pictureResponse;
        }

        public void setPictureResponse(PictureResponse pictureResponse) {
            this.pictureResponse= pictureResponse;
        }
}

0
你可以在Observable链中使用.zipWith操作符。
如果uploadMovies()uploadPictures()返回Observable,
uploadMovies()
    .zipWith(uploadPictures()) { m, p ->
        "$m with $p were uploaded"
    }
    .subscribe { print(it) }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接