使用RxJava和Retrofit进行定期的HTTP请求

25

是否可能使用RxJava/RxAndroid和Retrofit执行定期的HTTP请求,以便每x秒更新数据?

目前我正在使用IntentService和递归Handler/Runnable,每隔x秒触发一次。 我想知道是否可以删除全部内容并让RxJava来处理请求。

final RestClient client = new RestClient();
final ApiService service = client.getApiService();

public interface ApiService {
    @GET("/athletes")
    public Observable<List<Athlete>> getAthletes();
}

service.getAthletes()
.retry(3)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<List<Athlete>>() {
    @Override
    public void call(List<Athlete> athletes) {
        // Handle Success
    }
}, new Action1<Throwable>() {
    @Override
    public void call(Throwable throwable) {
        // Handle Error
    }
});

编辑

最终我得到了以下代码,欢迎任何更新。

final Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor());

obs = Observable.interval(30, TimeUnit.SECONDS, scheduler)
            .flatMap(tick -> service.getAthletes(1, 0l))
            // Performed on service.getAthletes() observable
            .subscribeOn(scheduler)
            .observeOn(AndroidSchedulers.mainThread())
            .doOnError(err -> Log.d("APP", "Error retrieving athletes: " + err.toString()))
            .retry()
            .flatMap(Observable::from)
            .filter(athlete -> {
                // Process Athlete here in the filter
                // Return true to always send messages but could filter them out too
                return true;
            });

public static void startUpdates() {
    if (sub != null) {
        sub = obs.subscribe(athlete -> {
            Log.d("APP", "Done updating athletes! ");
        });
    }
}

public static void stopUpdates() {
    sub.unsubscribe();
    sub = null;
}
1个回答

29

使用Observable.interval,并在flatMap内在单线程Scheduler上订阅以防止service.getAthletes()的重叠请求:

Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor());
Observable.interval(x, TimeUnit.SECONDS)
.flatMap(n -> 
    service.getAthletes()
        .retry(3)
        .subscribeOn(scheduler))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<List<Athlete>>() {
        @Override
        public void call(List<Athlete> athletes) {
            // Handle Success
        }
    }, new Action1<Throwable>() {
        @Override
        public void call(Throwable throwable) {
            // Handle Error
        }
    });

你能解释一下这里 flatMap 的作用吗?我使用的是 Java 7,所以 lambda 不适用。实现 Func1 时,我注意到它返回了一个 Observable。.subscribe() 返回一个 Subscription。所以只需返回 Observable 就可以了,没问题吧? - Jeremy Lyman
2
这是如何将Observable<Observable<T>>“展平”为Observable<T>的方法,因此是的,Func1应该具有签名Func1<Integer,Observable<List<Athlete>>>。顺便说一句,我希望您的API返回Observable<Athlete>,而不是Observable<List<Athlete>>,以获得更大的灵活性(如果需要,您可以通过添加.toList从service.getAthletes()获取Observable<List<Athlete>>)。 - Dave Moten
该服务并不完美,因此反序列化器会剥离出运动员列表并返回,而不是使用额外的模型。除此之外,我注意到即使之前的调用还没有响应,这种方法也会进行另一个调用。有没有一种方法可以设置它只在前一个响应后才进行后续调用? - Jeremy Lyman
也许可以使用 .delay(x, TimeUnit.SECONDS) 和 .repeat()?类似这样的代码:source.doOnNext(processAthletes).doOnError(logError()).delay(x, TimeUnit.SECONDS).repeat().subscribe()。 - Dave Moten
1
不一定。我已经更新了单线程调度器的使用。看看你觉得怎么样。 - Dave Moten
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接