使用OkHttp、Okio和RxJava下载文件

4
我将使用OkHttp下载文件并使用Okio写入磁盘。此外,我已经为此过程创建了一个Rx可观察对象。它可以工作,但是比我之前使用的(Koush的Ion库)明显慢。

以下是我创建可观察对象的方式:

public Observable<FilesWrapper> download(List<Thing> things) {
    return Observable.from(things)
        .map(thing -> {
            File file = new File(getExternalCacheDir() + File.separator + thing.getName());

            if (!file.exists()) {
                Request request = new Request.Builder().url(thing.getUrl()).build();
                Response response;
                try {
                    response = client.newCall(request).execute();
                    if (!response.isSuccessful()) new IOException();
                    else {
                        BufferedSink sink = Okio.buffer(Okio.sink(file));
                        sink.writeAll(response.body().source());
                        sink.close();
                    }
                } catch (IOException e) {
                    new IOException();
                }
            }

            return file;
        })
        .toList()
        .map(files -> new FilesWrapper(files);
}

有人知道是什么导致了速度缓慢吗?或者我是否使用了不正确的操作符?

1
你之前在做什么?目前为止,你只能一次执行一个请求,而不能并行执行它们... - alexwen
2
您忘记了对于不成功的响应使用 throw - Jesse Wilson
1个回答

7
使用flatMap而不是map,可以让你并行执行下载操作:
public Observable<FilesWrapper> download(List<Thing> things) {
    return Observable.from(things)
            .flatMap(thing -> {
                File file = new File(getExternalCacheDir() + File.separator + thing.getName());
                if (file.exists()) {
                    return Observable.just(file);
                }

                final Observable<File> fileObservable = Observable.create(sub -> {
                    if (sub.isUnsubscribed()) {
                        return;
                    }

                    Request request = new Request.Builder().url(thing.getUrl()).build();

                    Response response;
                    try {
                        response = client.newCall(request).execute();
                        if (!response.isSuccessful()) { throw new IOException(); }
                    } catch (IOException io) {
                        throw OnErrorThrowable.from(OnErrorThrowable.addValueAsLastCause(io, thing));
                    }

                    if (!sub.isUnsubscribed()) {
                        try (BufferedSink sink = Okio.buffer(Okio.sink(file))) {
                            sink.writeAll(response.body().source());
                        } catch (IOException io) {
                            throw OnErrorThrowable.from(OnErrorThrowable.addValueAsLastCause(io, thing));
                        }
                        sub.onNext(file);
                        sub.onCompleted();
                    }

                });
                return fileObservable.subscribeOn(Schedulers.io());
            }, 5)
            .toList()
            .map(files -> new FilesWrapper(files));
}

我们使用flatMap的maxConcurrent来限制每个订阅者的同时请求数量。

2
你可能想要在flatMap中添加maxConcurrent值来限制并发网络调用的数量。 - akarnokd
1
如何最好地使用Rx与OkHttp的异步“enqueue”API? - Jesse Wilson
Jesse,你可以尝试在这里完成:https://gist.github.com/alexwen/4b337bc669509a696b5b - alexwen

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接