使用ReactiveX for Java进行Http调用

8

我刚接触Java的ReactiveX,现在有一段代码块需要进行外部http调用但是没有异步操作。我们使用的是rxjava 1.2和Java 1.8。

  private ResponseEntity<String> callExternalUrl(String url, String json, HttpMethod method) {

    RestTemplate restTemplate;
    HttpEntity request;

      request = new HttpEntity(jsonContent, httpHeaders);

    return restTemplate.exchange(url, httpMethod, request, String.class);

  }

我找到了以下在线代码块,但我无法完全理解它以及如何将其应用于我的代码库。

private RxClient<RxObservableInvoker> httpClient;
public <T> Observable<T> fetchResult(String url, Func1<Response, T> mapper) {

    return httpClient.target(url)
        .request()
        .rx()
        .get()
        .subscribeOn(Schedulers.io())
        .map(mapper);
  }
1个回答

8
如果我理解您的意思正确,您需要像这样封装您现有的callExternalUrl
static Observable<String> callExternalUrlAsync(String url, String json, HttpMethod method)
{
    return Observable.fromCallable(() -> callExternalUrl(url, json, method))
            .subscribeOn(Schedulers.io())
            .flatMap(re -> {
                         if (re.hasBody())
                             return Observable.just(re.getBody());
                         else
                             return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode()));
                     },
                     e -> Observable.error(e),
                     (Func0<Observable<? extends String>>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-(
            .observeOn(Schedulers.computation());
}

代码简述:

  1. 它在 Schedulers.io 上安排现有的 callExternalUrl 的执行。
  2. ResponseEntity<T> 进行最小转换,以成功的 T 和错误情况。同样在 io 调度程序上进行,但由于非常短暂,所以不重要。(如果在 callExternalUrl 中出现异常,则原样传递。)
  3. 使订阅者的结果在 Schedulers.computation 上执行。

注意事项

  1. 您可能想为 subscribeOnobserveOn 使用自定义调度程序。
  2. 您可能希望在传递给 flatMap 的第一个 lambda 中使用更好的逻辑来区分成功和错误,并且绝对需要一些更具体的异常类型。

高阶魔法

如果您愿意使用高阶函数并为减少代码重复而牺牲一点性能,可以像这样做:

// Universal wrapper method
static <T> Observable<T> wrapCallExternalAsAsync(Func3<String, String, HttpMethod, ResponseEntity<T>> externalCall, String url, String json, HttpMethod method)
{
    return Observable.fromCallable(() -> externalCall.call(url, json, method))
            .subscribeOn(Schedulers.io())
            .flatMap(re -> {
                         if (re.hasBody())
                             return Observable.just(re.getBody());
                         else
                             return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode()));
                     },
                     e -> Observable.error(e),
                     (Func0<Observable<? extends T>>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-(
            .observeOn(Schedulers.computation());
}

static Observable<String> callExternalUrlAsync_HigherOrder(String url, String json, HttpMethod method)
{
    return wrapCallExternalAsAsync(MyClass::callExternalUrl, url, json, method);
}

MyClass 是您的 callExternalUrl 所在的位置。


更新(仅限异步调用)

private static RxClient httpClient = Rx.newClient(RxObservableInvoker.class); // 在此处您可能需要传递自定义的 ExecutorService

private <T> Observable<String> executeHttpAsync(String url, String httpMethod, Entity<T> entity) {
    return httpClient.target(url)
            .request()
            .headers(httpHeaders) // assuming httpHeaders is something global as in your example
            .rx()
            .method(httpMethod, entity)
            .map(resp -> {
                if (200 != resp.getStatus()) {
                    throw new RuntimeException("Bad status code " + resp.getStatus());
                } else {
                    if (!resp.hasEntity()) {
                        // return null; // or error?
                        throw new RuntimeException("Empty response"); // or empty?
                    } else {
                        try {
                            return resp.readEntity(String.class);
                        } catch (Exception ex) {
                            throw new RuntimeException(ex); // wrap exception into unchecked
                        }
                    }
                }
            })
            .observeOn(Schedulers.computation());
}

private Observable<String> executeGetAsync(String url) {
    return executeHttpAsync(url, "GET", null);
}

private Observable<String> executePostAsync(String url, String json) {
    return executeHttpAsync(url, "POST", Entity.json(json));
}

同样适用类似的警告:

  1. 您可能想要为newClient调用和observeOn都使用自定义调度程序。
  2. 您可能希望有一些更好的错误处理逻辑,而不仅仅是检查它是否为HTTP 200,而且肯定需要一些更具体的异常类型。但这都是业务逻辑特定的,所以由您决定。

此外,从您的示例中无法清楚地了解请求正文(HttpEntity)的实际构建方式,以及您是否总是希望将String作为响应,就像在原始示例中一样。尽管如此,我只是按原样复制了您的逻辑。如果您需要其他内容,您可能应该参考https://jersey.java.net/documentation/2.25/media.html#json上的文档。


感谢您的回复。我可以尝试您所做的事情,但我忘记了我们需要使用RxClient <RxObservableInvoker> httpClient; - WowBow
再次感谢。我会试一下并让您知道结果。从我的代码中可以看出,httpEntity是按以下方式填充的:HttpEntity request = new HttpEntity(jsonContent, httpHeaders); 调用者方法应提供JSON内容和标头。关于响应,restTemplate.exchange(..)返回ResponseEntity <String>,我必须浏览字符串以获取所需内容。这就是为什么我希望将响应作为ResponseEntity <String>返回。如果有更好的方法,我很乐意尝试。 - WowBow
@SerGr 不确定这里使用实体的用途。我也不确定该传递什么作为“实体”。同样的事情也适用于“T”。在这里通用化的目的是什么?也许你可以向我展示如何调用此方法并在调用者方法中使用结果。假设除了 Entity<T> 之外,所有其他变量都已经有一些数据?谢谢。 - WowBow
1
@WowBow,所以现在你遇到了一个不同的问题,当你发送POST请求时,会收到HTTP 411错误“请求必须分块或具有内容长度”?我不知道为什么会发生这种情况,在我的本地测试中,我可以看到有效的Content-Length作为POST请求的一部分发送。我认为你需要展示你当前的实际代码,可能甚至需要创建一个新的问题,因为它对于评论来说太复杂了。没有最小、完整和可验证的示例,很难猜测。 - SergGr
我在这里发布了我的新问题,请帮忙。http://stackoverflow.com/questions/43242926/how-to-add-a-parameter-to-rxclient-web-request - WowBow
显示剩余16条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接