RXJava - 线程优先级

3

我正在使用以下代码获取我的订阅的ExecutorService

public static ExecutorService getThreadPoolExecutorService(int threads)
{
    int NUMBER_OF_CORES = Runtime.getRuntime().availableProcessors();
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            NUMBER_OF_CORES * 2,
            NUMBER_OF_CORES * 2,
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>()
    );
    return Executors.newFixedThreadPool(threads, threadPoolExecutor.getThreadFactory());
}

然后像下面这样使用它:

public static Scheduler getBackgroundSchedular()
{
    if (mBackgroundExecutorService == null)
        mBackgroundExecutorService = ThreadUtil.getThreadPoolExecutorService(4);
    return Schedulers.from(mBackgroundExecutorService);
}

我使用这个调度器来让我的可观察对象在后台运行。

问题

如何为RXJava使用PriorityBlockingQueue?通常,我会使用一些特殊的可运行对象来实现比较函数,并使用相应的比较函数替换上面示例中的LinkedBlockingQueue,但是对于RXJava可观察对象,我该如何做呢?

1个回答

3
在RxJava中,调度器与数据流是正交的,它们根本不知道数据流的存在。调度器只执行一个Runnable实例,而所有由操作符创建的Runnables都是平等的。因此,在Schedulers中使用优先级队列是没有意义的。
此外,数据流是顺序的,除非有一些边界(如线程交换),否则它们会保持其顺序。除非你手动收集事件并进行排序,否则具有优先级队列的操作符可能会以你不希望的方式重新排序数据。
编辑:
有点不寻常,但你也可以将PriorityBlockingQueue放在主题和一些操作符上,这样你就可以用任务来填充一个序列。
PriorityBlockingQueue<Task> q = ...

Subject<Integer, Integer> subject = PublishSubject.<Integer>create().toSerialized();

subject
.map(v -> q.poll())
.doOnNext(v -> v.execute())
.subscribe();

q.offer(new Task(...));
subject.onNext(1);

考虑以下示例:我有一个订阅者,它从SD卡中加载所有文件夹及其内容。一旦我知道它们的名称,我就会立即显示这些文件夹,但是可观察对象仍将在后台继续工作。当我单击未加载的文件夹时,我创建第二个可观察对象,仅加载此单个文件夹的内容=>我希望此线程比主可观察对象的线程具有更高的优先级。取消订阅主可观察对象是一种选择,但我仍需要其中的数据,因此取消订阅不是最佳选择... - prom85
使用IO调度程序来运行单个目录并让操作系统进行仲裁有什么问题吗? - akarnokd
当我首次取消订阅主要的可观察对象时,我可以看到显著的速度提升... 我想检查一下单个文件夹可观察对象的优先级是否有相同的效果... - prom85

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接