RxJava内部如何产生背压问题

3

我一直在阅读RxJava中关于背压的一些文档,但是我没有找到像内部库中发生的详细说明。每个人都只是概括地说“生产者”太快而“消费者”太慢。

例如下面的代码:

Observable.interval(1, TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.newThread())
    .subscribe(
        i -> {
            System.out.println(i);
            try {
                Thread.sleep(100);
            } catch (Exception e) { }
        },
        System.out::println);

我一直在研究 RxJava 的源代码,我的理解是,在主线程中,我们每毫秒都会发出事件,一旦发出,我们将值传递给 System.out.println(i) 方法,并将其抛到 newThread 调度程序的线程池中并在可运行的方法内运行。
所以我的问题是,异常是如何在内部发生的?因为当我们调用 Thread.sleep() 时,我们只是让处理方法调用的线程 -> System.out.println() 进入睡眠状态,而不影响线程池中的其他线程,那么它怎么会引起异常呢?是因为线程池没有足够的可用线程了吗?
谢谢
1个回答

8
您可以将背压看作是一个许可证系统,其中一个操作符向其上游源颁发许可证:您可以给我128个元素。稍后该操作符可能会说“好的,再给我96个”,因此总共可能有224个未完成的许可证。某些来源(如interval)不关心许可证,只是按周期性地分发值。由于许可证数量通常与队列或缓冲区中的可用容量密切相关,分配超过这些存储器容量的内容会产生MissingBackpressureException异常。
主要通过检测有界队列的offer返回false来检测背压违规情况,例如在observeOn中指示队列已满。
检测违规的第二种方法是通过跟踪操作符中的未完成许可证数量来进行的,例如onBackpressureDrop,每当上游发送的许可证数量超过该数量时,操作符将不会转发它。
// in onBackpressureDrop
public void onNext(T value) {
    if (emitted != availablePermits) {
        emitted++;
        child.onNext(value);
    } else {
        // ignoring this value
    }
}

子订阅者通过request()信号其许可,通常会导致onBackpressureDrop中类似以下的情况:

public void childRequested(long n) {
    availablePermits += n;
}

在实际应用中,由于可能存在异步执行的情况,availablePermits 是一个 AtomicLong 类型的变量(并被称为 requested)。


你是在说RxJava中的某些操作符会将onNext()事件放入队列或缓冲区数据结构中,对吗?所以异常不是由线程池引起的,对吧? - Qing

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接