RxJava 观察在调用/订阅线程上

44

我有些困难理解RxJava中的subscribeOn/observeOn如何工作。我创建了一个简单的应用程序,其中observable发出太阳系行星名称,进行一些映射和过滤并打印结果。

据我所知,通过subscribeOn运算符将工作调度到后台线程(并且似乎运作良好)。

使用observeOn运算符在后台线程上进行观察也可以正常工作。

但是,我不明白如何在调用线程上观察(无论它是主线程还是其他线程)。在Android上很容易通过AndroidSchedulers.mainThread()运算符实现,但我不知道如何在纯java中实现这一点。

这是我的代码:

public class Main {

    public static void main(String[] args) throws InterruptedException {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 3000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

        System.out.println("Main thread: " + getCurrentThreadInfo());

        Observable<String> stringObservable = Observable.from(Arrays.asList("Merkury", "Wenus", "Ziemia", "Mars", "Jowisz", "Saturn", "Uran", "Neptun", "Pluton"))
                .map(in -> {
                    System.out.println("map on: " + getCurrentThreadInfo());
                    return in.toUpperCase();
                })
                .filter(in -> {
                    System.out.println("filter on: " + getCurrentThreadInfo());
                    return in.contains("A");
                })
                .subscribeOn(Schedulers.from(executor));

        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread("Thread-" + i) {
                @Override
                public void run() {
                    stringObservable
                            .buffer(5)
                            .subscribe(s -> System.out.println("Result " + s + " on: " + getCurrentThreadInfo()));
                }
            };
            thread.start();
        }

    }

    private static String getCurrentThreadInfo() {
        return Thread.currentThread().getName() + "(" + Thread.currentThread().getId() + ")";
    }
}

在创建过程中观察到的可观察对象并且工作正在从执行器的三个线程中的一个订阅。这按预期工作。但是如何在动态创建的线程上观察结果?有没有一种方法可以从当前线程创建调度程序?

另外,我发现运行此代码后,它永远不会终止,我不知道为什么? :(


1
Android中的主线程不一定是调用线程。这里有一个重要的区别。它更类似于Java Swing中的EDT。 - Thorn G
当然,并不总是调用线程,但通常它被用作应该传递结果的线程。我应该更加精确。 - Filip Zymek
2个回答

91
为了回答你的问题,让我从头开始解释。这样可以让其他人了解你已经知道的内容。
调度器
调度器在Java中的作用与执行器相同。简单来说,它们决定在哪个线程上执行操作。
通常,Observable和操作符在当前线程上执行。有时候,你可以将调度器传递给Observable或操作符作为参数(例如:Observable.timer())。
此外,RxJava提供了两个操作符来指定调度器:
- subscribeOn - 指定Observable将要操作的调度器 - observeOn - 指定观察者将要观察该Observable的调度器
为了快速理解它们,我使用以下示例代码:
在所有的示例中,我都将使用辅助函数createObservable,它会发出Observable所在线程的名称:
 public static Observable<String> createObservable(){
        return Observable.create((Subscriber<? super String> subscriber) -> {
                subscriber.onNext(Thread.currentThread().getName());
                subscriber.onCompleted();
            }
        );
    }

没有调度程序:

createObservable().subscribe(message -> {
        System.out.println("Case 1 Observable thread " + message);
        System.out.println("Case 1 Observer thread " + Thread.currentThread().getName());
    });
    //will print:
    //Case 1 Observable thread main
    //Case 1 Observer thread main

订阅于:

createObservable()
            .subscribeOn(Schedulers.newThread())
            .subscribe(message -> {
                System.out.println("Case 2 Observable thread " + message);
                System.out.println("Case 2 Observer thread " + Thread.currentThread().getName());
            });
            //will print:
            //Case 2 Observable thread RxNewThreadScheduler-1
            //Case 2 Observer thread RxNewThreadScheduler-1

SubscribeOn 和 ObserveOn:

reateObservable()
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.newThread())
            .subscribe(message -> {
                System.out.println("Case 3 Observable thread " + message);
                System.out.println("Case 3 Observer thread " + Thread.currentThread().getName());
            });
            //will print:
            //Case 3 Observable thread RxNewThreadScheduler-2
            //Case 3 Observer thread RxNewThreadScheduler-1

ObserveOn:

createObservable()
            .observeOn(Schedulers.newThread())
            .subscribe(message -> {
                System.out.println("Case 4 Observable thread " + message);
                System.out.println("Case 4 Observer thread " + Thread.currentThread().getName());
            });
            //will print:
            //Case 4 Observable thread main
            //Case 4 Observer thread RxNewThreadScheduler-1

答案:

AndroidSchedulers.mainThread() 返回一个调度器,该调度器将工作委派给与主线程关联的 MessageQueue。
为此,它使用 android.os.Looper.getMainLooper() 和 android.os.Handler。

换句话说,如果您想要指定特定的线程,则必须提供一种在该线程上安排和执行任务的方法。

在底层,它可以使用任何类型的 MQ 存储任务和循环逻辑,以便执行任务。

在 Java 中,我们有 Executor,专门用于此类任务。RxJava 可以轻松地从此类 Executor 创建 Scheduler。

下面是一个示例,演示了如何在主线程上观察(虽然没有特别实用,但展示了所有所需的部分)。

public class RunCurrentThread implements Executor {

    private BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();

    public static void main(String[] args) throws InterruptedException {
        RunCurrentThread sample = new RunCurrentThread();
        sample.observerOnMain();
        sample.runLoop();
    }

    private void observerOnMain() {
        createObservable()
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.from(this))
                .subscribe(message -> {
                    System.out.println("Observable thread " + message);
                    System.out.println("Observer thread " + Thread.currentThread().getName());
                });
        ;
    }

    public Observable<String> createObservable() {
        return Observable.create((Subscriber<? super String> subscriber) -> {
                    subscriber.onNext(Thread.currentThread().getName());
                    subscriber.onCompleted();
                }
        );
    }

    private void runLoop() throws InterruptedException {
        while(!Thread.interrupted()){
            tasks.take().run();
        }
    }

    @Override
    public void execute(Runnable command) {
        tasks.add(command);
    }
}

最后一个问题,为什么你的代码没有终止:

ThreadPoolExecutor默认使用非守护线程,因此你的程序不会结束直到这些线程不存在。你应该使用shutdown方法关闭这些线程。


1
在“SubscribeOn和ObserveOn”示例中,您可能无法获得结果,这可能是因为在“subscribeOn”中使用了newThread。有没有方便的方法来阻止它?使用“toBlocking”方法不允许使用“subscribe”方法。 - hi_my_name_is
1
你可以用许多方法来解决问题,其中最简单的方法是使用Thread.sleep(1000)。 - Marek Hawrylczak
8
你的例子不是反了吗?订阅者接收到的“message”应该是 Observable 线程的名称,而“Thread.currentThread().getName()”是观察者线程的名称。 - Rodrigo Quesada
我同意Rodrigo的观点。否则我无法理解这些信息。尽管如此,这是一个很好的简单示例,说明subscribeOn和observeOn是如何工作的。 - Serge Tahé
1
作为给那些阅读了@RodrigoQuesada评论的人的信息,这个小错误现在已经被纠正了。 - Daniel F
这对于理解 Schedulers.from() 很有用,使用 blocking* 订阅(例如 blockingForEach)是否更容易? - MainActivity

7
这是一个针对 RxJava 2 的简化示例。它与 Marek 的答案概念相同:一个 Executor 将 Runnable 添加到 BlockingQueue 中,该队列在调用方线程上被消耗。请注意,保留了 HTML 标记。
public class ThreadTest {

    @Test
    public void test() throws InterruptedException {

        final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();

        System.out.println("Caller thread: " + Thread.currentThread().getName());

        Observable.fromCallable(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("Observable thread: " + Thread.currentThread().getName());
                return 1;
            }
        })
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.from(new Executor() {
                @Override
                public void execute(@NonNull Runnable runnable) {
                    tasks.add(runnable);
                }
            }))
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(@NonNull Integer integer) throws Exception {
                    System.out.println("Observer thread: " + Thread.currentThread().getName());
                }
            });
        tasks.take().run();
    }

}

// Output: 
// Caller thread main
// Observable thread RxCachedThreadScheduler-1
// Observer thread main

嘿,在你的回答中,.observeOn(Schedulers.from(new Executor() { ... 是否会在主线程上执行?例如,subscribeOn 在后台运行,然后 observeOn 再次在主线程上运行?我来自 RxJava 2,那里我可以简单地使用 AndroidSchedulers.mainThread(),我正在寻找 RxJava 1 中的等效方法。 - Tom
如果是从调用线程中调用的话,就在Tom线程中执行。但是这个例子看起来并不是要在主线程上运行,因为由于阻塞队列,整个执行过程会阻塞调用线程。 - Daniel F
1
此示例只调用了 tasks.take() 一次,所以在处理完一个事件后就完成了。这与大多数生产环境下的 BlockingQueue 代码不同,因为你需要使用某种无限循环来继续处理条目。 - miguel

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接