为什么 doOnNext() 方法没有被调用?

3
下面的订阅者既没有调用onNext()也没有调用onCompleted()。我尝试使用doOnNext()/doOnTerminate()实现订阅者。我还尝试了doAfterTerminate()。我还尝试显式定义一个订阅者,但是它既没有调用onNext()也没有调用onCompleted()。
根据RxJS reduce doesn't continue,reduce()没有终止,所以我尝试添加take(1),但是没有成功。在同一个stackoverflow问题中,有人说问题可能是我的流从未关闭。除了take(1)之外,也许还有其他关闭流的方法,但我还不太了解ReactiveX。
根据Why is OnComplete not called in this code? (RxAndroid),可能是系列中的原始流没有终止。但是,如果我调用take(1),它应该会发出终止信号,我不明白为什么这会有影响。
基本上,为什么以下行不被执行?
System.out.println("doOnNext map.size()=" + map.size());

即使以下代码行被执行了98次:
map.put(e.getKey(), e.getValue());

JsonObjectObservableRequest.java

import com.android.volley.toolbox.JsonObjectRequest;
...
public class JsonObjectObservableRequest {

    public JsonObjectObservableRequest(int method, String url, JSONObject request) {
    jsonObjectRequest = new JsonObjectRequest(method, url, request, getResponseListener(),     getResponseErrorListener());
    }

    private Response.Listener<JSONObject> getResponseListener() {
        return new Response.Listener<JSONObject>() {
            @Override
            public void onResponse(JSONObject response) {
                publishSubject.onNext(Observable.just(response));
            }
        };
    }

    private Response.ErrorListener getResponseErrorListener() {
        return new Response.ErrorListener() {
            @Override
            public void onErrorResponse(VolleyError error) {
                Observable<JSONObject> myError = Observable.error(error);
                publishSubject.onNext(myError);
            }
        };
    }

    public JsonObjectRequest getJsonObjectRequest() {
        return jsonObjectRequest;
    }

    public Observable<JSONObject> getObservable() {
    return publishSubject.flatMap(new Func1<Observable<JSONObject>, Observable<    JSONObject>>() {
            @Override
            public Observable<JSONObject> call(Observable<JSONObject> jsonObjectObservable) {
                return jsonObjectObservable;
            }
        });
    }
}

JsonObjectObservableRequest 调用代码

import com.android.volley.Request;
import com.android.volley.RequestQueue;
...
    JsonObjectObservableRequest jsonObjectObservableRequest = new JsonObjectObservableRequest(Request.Method.GET, idURLString, null, keyId, key);
    Observable<JSONObject> jsonObjectObservable = jsonObjectObservableRequest.getObservable();

    jsonObjectObservable
            .map(json -> {
                try {
                    return NetworkAccountIdDatasource.parseIdJSON(json);
                } catch (JSONException e) {
                    e.printStackTrace();
                    return null;
                }
            })
            .flatMapIterable(x -> x)
            .map(s -> new AbstractMap.SimpleEntry<String, String>("Name of " + s, "short id for " + s.substring(4)))
            .reduce(new HashMap<String, String>(), (map, e) -> {
                map.put(e.getKey(), e.getValue());
                return map;
            })
            .take(1)
            .doOnNext(map -> {
                System.out.println("doOnNext map.size()=" + map.size());
            })
            .doOnTerminate(() -> {
                System.out.println("doOnTerminate");
            })
            .subscribe();

    final RequestQueue queue = Volley.newRequestQueue(context);
    queue.add(jsonObjectObservableRequest.getJsonObjectRequest());

1
为什么还在使用Volley,Retrofit2才是新的方式! - Remario
谢谢@Remario。你是怎么知道的? - Michael Osofsky
1
首先,它更简洁,更快(okHttp3),并使用Java注释处理器。我的意思是,现在几乎每个Android开发者都已经转换了。 - Remario
请阅读相关资料,你会喜欢它的! - Remario
更好的理由是,Retrofit2直接支持调用适配器,可以直接提供Observable。 - EpicPandaForce
2个回答

1

你正确地识别了问题,你没有在源网络Observable中调用onCompleted()(在JsonObjectObservableRequest类中),而take(1)也没有帮助,因为你将其放在reduce之前。
正如你可能已经理解的那样,reduce必须对具有有限发射次数的Observable进行操作,因为它在所有项目已发射时发出累积项目,并且因此依赖于onCompleted()事件以知道观察到的流何时结束。

  • 我认为在你的情况下,最好的方法是更改 JsonObjectObservableRequest 的操作方式,你不需要使用 Subject,而可以使用创建方法来包装回调函数 (你可以在这里看到我的答案 here,并阅读有关在回调世界和 RxJava 之间桥接的更多信息 here
  • 此外,你不需要发出一个包含某些内容的 Observable 然后将其 flatmap 成项目,你可以简单地使用 onNext() 发出该项,用 onError() 发出错误,实际上你正在隐藏错误并将它们转换为一次发射,这可能会使处理流中的错误更加困难。
  • 应该在调用 onNext() 后在 onResponse() 调用 onCompleted() 来信号完成,在错误的情况下,发出 onError 将通知终止流。
  • 另一个问题是取消请求,这似乎不是以这种方式处理的,你可以查看源代码以了解如何在包装回调调用时完成取消请求。

0
我找到了一种方法来调用doOnTerminate()和doOnNext()。将take(1)语句放在流处理的较高位置即可。这个想法是基于@yosriz的评论得出的。我希望我能理解为什么这样可以解决问题,但我不明白,因为我认为终止信号只会被传播,但我猜我需要学习更多关于ReactiveX /反应式函数编程等方面的知识。
JsonObjectObservableRequest调用代码
import com.android.volley.Request;
import com.android.volley.RequestQueue;
...
    JsonObjectObservableRequest jsonObjectObservableRequest = new JsonObjectObservableRequest(Request.Method.GET, idURLString, null, keyId, key);
    Observable<JSONObject> jsonObjectObservable = jsonObjectObservableRequest.getObservable();

jsonObjectObservable
        .map(json -> {
            try {
                return NetworkAccountIdDatasource.parseIdJSON(json);
            } catch (JSONException e) {
                e.printStackTrace();
                return null;
            }
        })
        .take(1) // SOLUTION: MOVED THIS UP FROM BELOW reduce()
        .flatMapIterable(x -> x)
        .map(s -> new AbstractMap.SimpleEntry<String, String>("Name of " + s, "short id for " + s.substring(4)))
        .reduce(new HashMap<String, String>(), (map, e) -> {
            map.put(e.getKey(), e.getValue());
            return map;
        })
        // .take(1) // SOLUTION: MOVE THIS ABOVE flatMapIterable() 
        .doOnNext(map -> {
            System.out.println("doOnNext map.size()=" + map.size());
        })
        .doOnTerminate(() -> {
            System.out.println("doOnTerminate");
        })
        .subscribe();

final RequestQueue queue = Volley.newRequestQueue(context);
queue.add(jsonObjectObservableRequest.getJsonObjectRequest());

此外,如果我将take(1)作为第一个操作放在第一个map()操作之前(调用parseIdJSON()的那个),它也可以工作。然而,如果我将take(1)作为reduce()之前的操作,它只能部分工作;好的是doOnNext()被调用了,但坏的是当我的数据集应该是98时,map.size()总是1。

1
你仔细读了我的答案吗?你在 JsonObjectObservableRequest 中没有调用 onCompleted,所以你的 Observable 永远不会结束(complete),因此 reduce 无法工作(因为它从未收到完成事件)。 - yosriz

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接