除了第一次外,延迟RxJava Android的执行

3

我正在进行异步调用,在1分钟内每隔10秒钟大约会进行6次调用,但问题是我想在特定的条件上应用延迟

Observable
.just(listOfSomethings_Locally)
.take(1, TimeUnit.MINUTES)
.serialize()
.delaySubscription( // this is confusing part 
() ->
    Observable.just(listOfItems_Network).take(10,TimeUnit.SECONDS)
) 

我想做的是延迟网络调用10秒钟,除了第一次外,然后在10秒后取消网络调用,这样我应该在1分钟内得到6个精确的调用。
编辑:
由于场景不清晰,重新定义场景:
我有一个大型的本地驱动程序列表,我想每隔10秒向它们发送请求,并侦听另一个订阅者以检查是否在10秒内取消了驱动程序。这个过程将持续约1分钟,如果一个驱动程序被取消,我应该立即向下一个驱动程序发送请求。
目前为止编写的代码:
Observable.from(driversGot)
                .take(1,TimeUnit.MINUTES)
                .serialize()
                .map(this::requestRydeObservable) // requesting for single driver from driversGot (it's a network call)
                .flatMap(dif ->
                        Observable.amb(
                                kh.getFCM().driverCanceledRyde(), // listen for if driver cancel request returns integer
                                kh.getFCM().userRydeAccepted()) // listen for driver accept returns RydeAccepted object
                                .map(o -> {
                                    if (o instanceof Integer) {
                                        return new RydeAccepted();
                                    } else if (o instanceof RydeAccepted) {
                                        return (RydeAccepted) o;
                                    }
                                    return null;
                                }).delaySubscription(10,TimeUnit.SECONDS)
                )
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(fua -> {
                    if (fua == null) {
                        UiHelpers.showToast(context, "Invalid Firebase response");
                    } else if (!fua.getStatus()) { // ryde is canceled because object is empty
                        UiHelpers.showToast(context, "User canceled ryde");
                    } else { // ryde is accepted
                        UiHelpers.showToast(context, "User accepted ryde");
                    }
                }, t -> {
                    t.printStackTrace();
                    UiHelpers.showToast(context,"Error sending driver requests");
                }, UiHelpers::stopLoading);

请提供有关您的可观察对象和用例的更详细信息。根据我的理解:您获得了一个项目列表,将被转换为可观察对象。您想要逐个处理已推送给您的元素。这些值必须在每10秒内安排到您的位置。例如:第0秒:值1,第10秒:值2。每个发出的值都将通过Web调用进行处理。您会检查另一个可观察对象,以确定该项目是否在10秒内被取消。如果没有,在一个元素的处理过程中需要60秒。如果在10秒内取消,则开始下一个排队的元素? - Sergej Isbrecht
你完全理解了流程,只是需要注意所有列表元素的总限时为60秒。 - Zulqurnain Jutt
所以,你的列表里有6个元素,或者你希望在60秒内完成,如果你每10秒安排一个元素? - Sergej Isbrecht
等等,以10秒为间隔安排并不合理。我想您希望安排一个动作,如果在10秒内未能得到结果,则将其取消并安排下一个动作。如果在10秒内得到了结果,则继续安排下一个动作? - Sergej Isbrecht
Drivercancel和Rydeaccepted只是可观察的,可能会发出值,如果它们中的任何一个被调用,Drivercancel将被调用,下一个将被安排,如果Rydeaccepted被调用,则所有请求都将被取消并执行某些操作。 - Zulqurnain Jutt
显示剩余3条评论
3个回答

2

您的代码反馈

您不需要使用takeserialize,因为just已经可以立即+连续地发出内容。

delaySubscription似乎是一个奇怪的选择,因为在传递的可观察对象生成事件后,后续事件不会被延迟(这与您的情况相矛盾) delaySubscription

选项#1,仅使用rx

使用delay+计算其余事件的单独延迟时间(因此第一个事件延迟0秒,第二个事件延迟1秒,第三个事件延迟3秒,...)

            AtomicLong counter = new AtomicLong(0);
    System.out.println(new Date());
    Observable.just("1", "2", "3", "4", "5", "6")
        .delay(item -> Observable.just(item).delay(counter.getAndIncrement(), TimeUnit.SECONDS))
        .subscribe(new Consumer<String>() {
            public void accept(String result) throws Exception {
                System.out.println(result + " " + new Date());
            }
        });        
        System.in.read();

选项2:速率限制

看起来您的使用情况适用于速率限制,因此我们可以使用Guava的RateLimiter:

            RateLimiter limiter = RateLimiter.create(1);
    System.out.println(new Date());
    Observable.just("1", "2", "3", "4", "5", "6")
        .map(r -> {
            limiter.acquire();
            return r;
        })
        .subscribe(new Consumer<String>() {
            public void accept(String result) throws Exception {
                System.out.println(result + " " + new Date());
            }
        });        
        System.in.read();

两者的工作方式相似:

Tue Nov 15 11:14:34 EET 2016
1 Tue Nov 15 11:14:34 EET 2016
2 Tue Nov 15 11:14:35 EET 2016
3 Tue Nov 15 11:14:36 EET 2016
4 Tue Nov 15 11:14:37 EET 2016
5 Tue Nov 15 11:14:38 EET 2016
6 Tue Nov 15 11:14:39 EET 2016

如果您的请求需要处理5秒钟,那么速率限制器会更有效,然后它将允许下一个请求更快地进行以弥补延迟并在10秒内达到1req / s的目标。


在你的第一种选择中,如何限制请求为1分钟时间,因为每个请求在(订单号)-1秒后执行,但它并不符合我的问题,我有一个大型的本地司机列表,我想在每个司机之间每隔10秒发送请求,并监听另一个订阅者以检查司机是否在10秒内取消了请求,该过程将持续约1分钟,如果一个司机取消了,我应该立即发送请求给下一个司机。 - Zulqurnain Jutt

0

你好,你可以使用带有延迟的重试,这里有一种方法可以做到 这里


这里的情况有点棘手,提供的链接仅解释了 delay 的行为。 - Zulqurnain Jutt

0

我想更新@Ivans的帖子,因为它缺少错误处理并且使用了副作用。

这篇文章只会使用RxJava操作符。Observable将在每10秒提供一次值。请求将在10秒超时。如果达到超时时间,将返回一个备用值。

第一个测试方法将在60秒内接收10个值。如果最后一个请求提前完成,那么Observable可能会提前完成。

public class TimeOutTest {
    private static String DUMMY_VALUE = "ERROR";

    @Test
    public void handles_in_60_seconds() throws Exception {
        List<Integer> actions = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        Observable<Long> timer = Observable.interval(0, 10, TimeUnit.SECONDS).take(6);

        Observable<Integer> vales = Observable.fromIterable(actions)
                .take(6);

        Observable<Integer> observable = Observable.zip(timer, vales, (time, result) -> {
            return result;
        });

        Observable<String> stringObservable = observable.flatMap(integer -> {
            return longNetworkLong(9_000)
                    .timeout(10, TimeUnit.SECONDS)
                    .onErrorReturnItem(DUMMY_VALUE);
        }).doOnNext(s -> System.out.println("VALUE"));

        stringObservable.test()
                .awaitDone(60, TimeUnit.SECONDS)
                .assertValueCount(6);
    }

    @Test
    public void last_two_values_timeOut() throws Exception {
        List<Integer> actions = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        Observable<Long> timer = Observable.interval(0, 10, TimeUnit.SECONDS).take(6);

        Observable<Integer> vales = Observable.fromIterable(actions)
                .take(6);

        Observable<Integer> observable = Observable.zip(timer, vales, (time, result) -> {
            return result;
        });

        Observable<String> stringObservable = observable
                .map(integer -> integer * 2500)
                .flatMap(integer -> {
                    return longNetworkLong(integer)
                            .timeout(10, TimeUnit.SECONDS)
                            .doOnError(throwable -> System.out.print("Timeout hit?"))
                            .onErrorReturnItem(DUMMY_VALUE);
                })
                .doOnNext(s -> System.out.println("VALUE"))
                .filter(s -> !Objects.equals(s, DUMMY_VALUE));

        stringObservable.test()
                .awaitDone(60, TimeUnit.SECONDS)
                .assertValueCount(4);

    }

    private Observable<String> longNetworkLong(int delayTime) {
        return Observable.fromCallable(() -> {
            Thread.sleep(delayTime);
            return "result";
        });
    }
}

1
它接受一个列表并将其转换为可观察对象。还有另一个可观察对象,每隔10秒发出一个值。我从两个可观察对象中取6个值,并将它们合并在一起。因此,我将每隔10秒收到一个推送的值,从第0秒开始。对于每个值,我调用一个长时间运行的方法:longNetworkLong。如果longNetworkLong可观察对象的结果需要超过10秒才能返回,我将取消请求并提供一个备选值。我会看看您的新需求。 - Sergej Isbrecht

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接