如何在RxScala/Java中使用多线程执行map、filter和flatMap操作?

4

如何使用多个线程在Observable上运行filtermapflatMap

  def withDelay[T](delay: Duration)(t: => T) = {
    Thread.sleep(delay.toMillis)
    t
  }

  Observable
    .interval(500 millisecond)
    .filter(x => {
      withDelay(1 second) { x % 2 == 0 }
    })
    .map(x => {
      withDelay(1 second) { x * x }
    }).subscribe(println(_))

目标是使用多个线程同时运行过滤和转换操作。

你看过这个吗:https://github.com/ReactiveX/RxJava/issues/1673 和这个:https://github.com/ReactiveX/RxJavaParallel - david.mihola
@david.mihola,是的,我检查了它们两个,并能够在多个线程中执行“subscribe”块,但我无法对map、flatMap和filter执行此操作。我假设在过滤或转换时可能会调用其他API或从数据库获取其他数据,因此我想确保此代码将被并发执行。 - Stephen L.
3个回答

0

这将在不同的线程中处理集合的每个项目(rxjava3)。

var collect = Observable.fromIterable(Arrays.asList("A", "B", "C"))
                      .flatMap(v -> {
                         return Observable.just(v)
                                        .observeOn(Schedulers.computation())
                                        .map(v1 -> {
                                            int time = ThreadLocalRandom.current().nextInt(1000);
                                            Thread.sleep(time);
                                            return String.format("processed-%s", v1);
                                        });
                      })
                      .observeOn(Schedulers.computation())
                      .blockingStream()
                      .collect(Collectors.toList());

0

你可以在每个操作上使用Async.toAsync()。

它在rxjava-async包中。

文档


-1

你必须使用observeOn操作符,它会在设置该操作符后执行特定线程中的所有后续操作符。

       /**
 * Once that you set in your pipeline the observerOn all the next steps of your pipeline will be executed in another thread.
 * Shall print
 * First step main
 * Second step RxNewThreadScheduler-2
 * Third step RxNewThreadScheduler-1
 */
@Test
public void testObservableObserverOn() throws InterruptedException {
    Subscription subscription = Observable.just(1)
            .doOnNext(number -> System.out.println("First step " + Thread.currentThread()
                    .getName()))
            .observeOn(Schedulers.newThread())
            .doOnNext(number -> System.out.println("Second step " + Thread.currentThread()
                    .getName()))
            .observeOn(Schedulers.newThread())
            .doOnNext(number -> System.out.println( "Third step " + Thread.currentThread()
                    .getName()))
            .subscribe();
    new TestSubscriber((Observer) subscription)
            .awaitTerminalEvent(100, TimeUnit.MILLISECONDS);
}

这里有更多的异步示例https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java


是的,但如果您使用.doOnNext(),则无法执行像map、flat等操作。它只接受操作。 - FedericoAlvarez
这只是一个例子,你应该能够使用任何运算符。 - paul

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接