Rxjava和Volley请求

23

我的问题可能听起来有些愚蠢,但我只是从Asynktask转向RxJava。 那么:

是否可以使用RxJava Observable与Volley请求一起使用?这意味着,使用future requests。

我之所以问这个问题,是因为另一个httpClient如retrofit非常擅长使用RxJava,但我个人喜欢Volley,所以这可行吗?

编辑

根据第一个答案,我得出结论是可行的。

你能分享一个示例展示如何做到这一点吗?


1
使用RequestFuture和Observable.from来编写程序相关内容。 - njzk2
1
在Volley中,我通常使用回调来返回值,但是在这里,我不知道如何做到这一点。 - Max Pinto
2
我也正好在尝试弄清楚如何做到这一点。我正在阅读这篇文章:http://code.hootsuite.com/observing-observables-in-mobile-rxjava-for-android/ - Al Lelopath
谢谢,我刚刚阅读了这篇文章,非常有帮助。我会尝试一些代码。 - Max Pinto
1
@MaxPinto 在 Volley 中,RequestFuture 是将监听器封装为 future 的一种方式,您可以将其与 Rx 一起使用。 - njzk2
是的,我找到了答案,会展示我的代码来帮助其他人。 - Max Pinto
2个回答

13

这段代码可以与这个库一起使用

api 'com.netflix.rxjava:rxjava-android:0.16.1'

最后感谢您的回答,我找到了一个解决方案想要分享:

在我的情况下,我使用Activities,但对于Fragment应该是更或多或少相同的。

而且我想要在响应中获取JsonObject,但可能需要您自定义Volley实现。

 public class MyActivity extends BaseActivityWithoutReloadCustomer implements Observer<JSONObject>

{
private CompositeSubscription mCompositeSubscription = new CompositeSubscription();
private Activity act;

 /**
 * @use handle response from future request, in my case JsonObject.
 */
 private JSONObject getRouteData() throws ExecutionException, InterruptedException {
    RequestFuture<JSONObject> future = RequestFuture.newFuture();
    String Url=Tools.Get_Domain(act.getApplicationContext(), Global_vars.domain)+ PilotoWs.wsgetRoutesbyUser+ Uri.encode(routeId);
    final Request.Priority priority= Request.Priority.IMMEDIATE;
    Estratek_JSONObjectRequest req= new Estratek_JSONObjectRequest(Request.Method.GET, Url,future,future,act,priority);
    POStreet_controller.getInstance().addToRequestQueue(req);
    return future.get();
}

 /**
 *@use the observable, same type data Jsob Object
 */
 public Observable<JSONObject> newGetRouteData() {
    return Observable.defer(new Func0<Observable<JSONObject>>() {
        @Override
        public Observable<JSONObject> call() {
            Exception exception;
            try {
                return Observable.just(getRouteData());
            } catch (InterruptedException | ExecutionException e) {
                Log.e("routes", e.getMessage());
                return Observable.error(e);
            }
        }
    });
};

@Override
public void onCreate(Bundle instance) {
    super.onCreate(instance);
    setContentView(R.layout.yourLayout);
    act = this;

    /**
     * @condition: RxJava future request with volley
     */

     mCompositeSubscription.add(newGetRouteData()
            .subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
            .subscribe(this));
 }


@Override
public void onCompleted() {
    System.out.println("Completed!");
}

@Override
public void onError(Throwable e) {
    VolleyError cause = (VolleyError) e.getCause();
    String s = new String(cause.networkResponse.data, Charset.forName("UTF-8"));
    Log.e("adf", s);
    Log.e("adf", cause.toString());
}

@Override
public void onNext(JSONObject json) {
    Log.d("ruta", json.toString());
}

对我而言,这就是有效的。希望能帮到某些人。

编辑 Estratek_JSONObjectRequest.java

public class Estratek_JSONObjectRequest extends JsonObjectRequest{
Activity Act;
Priority priority;

public Estratek_JSONObjectRequest(int method, String url,
                                  JSONObject jsonRequest, Listener<JSONObject> listener,
                                  ErrorListener errorListener,Activity act, Priority p) {
    super(method, url, jsonRequest, listener, errorListener);
    this.Act=act;
    this.priority=p;
}

public Estratek_JSONObjectRequest(int method, String url,
                                  Listener<JSONObject> listener,
                                  ErrorListener errorListener,Activity act, Priority p) {
    super(method, url, null, listener, errorListener);
    this.Act=act;
    this.priority=p;
}

@Override
public Map<String, String> getHeaders()  {
    HashMap<String, String> headers = new HashMap<String, String>();
    headers.put("Content-Type", "application/json; charset=utf-8");
    headers.put("Authorization", "Bearer "+Tools.mySomeBearerToken);
    return headers;
}

//it make posible send parameters into the body.
@Override
public Priority getPriority(){
    return priority;
}

protected Response<JSONObject> parseNetworkResponse(NetworkResponse response) {
    try {
        String je = new String(response.data, HttpHeaderParser.parseCharset(response.headers));
        if (je.equals("null")){
            je="{useInventAverage:0}";
            return Response.success(new JSONObject(je), HttpHeaderParser.parseCacheHeaders(response));
        }
        else
            return Response.success(new JSONObject(je), HttpHeaderParser.parseCacheHeaders(response));
    } catch (UnsupportedEncodingException var3) {
        return Response.error(new ParseError(var3));
    } catch (JSONException var4) {
        return Response.error(new ParseError(var4));
    }
}

这类似于Volley构造函数,但我自己制作了定制版本以发送一些头部信息,例如Bearer令牌、content-type、发送优先级等等。否则就是相同的。

对于RXJAVA2库,需要按照以下方式操作:

> build.gradle应该有类似于以下代码:

api "io.reactivex.rxjava2:rxandroid:2.0.2":

public class MainActivity extends AppCompatActivity {
CompositeDisposable         mCompositeDisposable = new CompositeDisposable();

@Override
public void onCreate(Bundle instancia) {
    super.onCreate(instancia);
    setContentView(R.layout.sale_orders_list);

    // disposable that will be used to subscribe
    DisposableSubscriber<JSONObject> d = new DisposableSubscriber<JSONObject>() {
        @Override
        public void onNext(JSONObject jsonObject) {
            onResponseVolley(jsonObject);
        }
        @Override
        public void onError(Throwable t) {
            // todo
        }

        @Override
        public void onComplete() {
            System.out.println("Success!");
        }
    };

    newGetRouteData()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(d);
}

@Override
public void onDestroy(){
    super.onDestroy();
    /**
     * @use: unSubscribe to Get Routes
     */
    if (mCompositeDisposable != null){
        mCompositeDisposable.clear();
    }
}


/**
 * @condition: RxJava future request with volley
 */
private JSONObject getRouteData() throws ExecutionException, InterruptedException,RuntimeException {
    RequestFuture<JSONObject> future = RequestFuture.newFuture();
    String Url = "https//miapirest.com/api";
    JSONObjectRequest req= new JSONObjectRequest(Request.Method.GET, Url,future,future,act,priority);


    VolleyInstance.addToRequestQueue(req);

    return future.get();
}

/**
 * @condition: this function create a new Observable object and return that if success or
 */
public Flowable<JSONObject> newGetRouteData() {
    return Flowable.defer(new Callable<Publisher<? extends JSONObject>>() {
        @Override
        public Publisher<? extends JSONObject> call() throws Exception {
            return Flowable.just(getRouteData());
        }
    });
};

}


1
您能发布 Estratek_JSONObjectRequest 或者它的一些通用形式吗? - Al Lelopath
我刚刚添加了几分钟前。 - Max Pinto
parseNetworkResponse 中的 "je" 是什么意思?这只是针对您的数据的特定内容吗? - Al Lelopath
是的,它是具体返回,在您的情况下,您可以像正常一样返回。 - Max Pinto
你的代码有个小错误~ 如果遇到类似于Volley创建的authfail exception这样的异常,我的程序会抛出一个错误, 我修复了catch块,像这样:try { return Observable.just(future.get()); } catch (InterruptedException e) { return Observable.error(e); } catch (ExecutionException e) { return Observable.error(e.getCause()); } - user998953
你说得对,我也找到了Ruta,但我还没有编辑它。谢谢。 - Max Pinto

5
好消息是,张涛先生 kymjs 将Google Volley改进为 RxVolley,移除了HttpClient和RxJava的支持。

RxVolley = Volley + RxJava + OkHttp

完整文档可在 http://rxvolley.mydoc.io/ 上找到。
P.S:我正在考虑如何使用RxVolley支持多个自定义JSON转换器,就像retrofit一样!

另外还有一个稍微扩展了一点的版本,包括转换器:https://github.com/apptik/jus - kalin
@djodjo 谢谢分享! - LOG_TAG
为什么移除了httpClient? - M.ArslanKhan

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接