在主线程中运行 Flow

5

我再次比较了RxJava和Java 9 Flow。我发现Flow默认是异步的,我想知道是否有办法使其同步运行。

有时我们只想使用它来进行语法糖而不是Nio,并且希望代码更加统一。

RxJava默认情况下是同步的,您可以在管道中使用observerOnsubscribeOn使其异步运行。

Flow中是否有任何操作符可以使其在主线程上运行?

敬礼。

3个回答

4
您可以按照 Flow 中的文档定义自定义的Publisher,以使用同步执行。

一个非常简单的发布者,当被请求时只向单个订阅者发布一个TRUE项目。由于订阅者仅接收单个项目,因此该类不使用缓冲和排序控制。

class OneShotPublisher implements Publisher<Boolean> {
   private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
   private boolean subscribed; // true after first subscribe
   public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
     if (subscribed)
       subscriber.onError(new IllegalStateException()); // only one allowed
     else {
       subscribed = true;
       subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
     }
   }
   static class OneShotSubscription implements Subscription {
     private final Subscriber<? super Boolean> subscriber;
     private final ExecutorService executor;
     private Future<?> future; // to allow cancellation
     private boolean completed;
     OneShotSubscription(Subscriber<? super Boolean> subscriber,
                         ExecutorService executor) {
       this.subscriber = subscriber;
       this.executor = executor;
     }
     public synchronized void request(long n) {
       if (n != 0 && !completed) {
         completed = true;
         if (n < 0) {
           IllegalArgumentException ex = new IllegalArgumentException();
           executor.execute(() -> subscriber.onError(ex));
         } else {
           future = executor.submit(() -> {
             subscriber.onNext(Boolean.TRUE);
             subscriber.onComplete();
           });
         }
       }
     }
     public synchronized void cancel() {
       completed = true;
       if (future != null) future.cancel(false);
     }
   }
 }

3

没有直接的操作符可以实现此功能,但API允许您控制项目发布的方式。因此,您可以直接从当前线程调用订阅者方法。

class SynchronousPublisher implements Publisher<Data>  {
      public synchronized void subscribe(Subscriber<? super Data> subscriber) {
           subscriber.onSubscribe(new SynchronousSubscription(subscriber));
      }
 }
 static class SynchronousSubscription implements Subscription {
      private final Subscriber<? super Data> subscriber;

       SynchronousSubscription(Subscriber<? super Data> subscriber) {
          this.subscriber = subscriber;
       }
       public synchronized void request(long n) {
           ... // prepare item            
           subscriber.onNext(someItem);      
       }

       ...
   }
}

谢谢,这也是一个不错的解决方法,虽然有点啰嗦,但是嘿!这就是Java :) - paul
1
@paul 请记住,Java将其作为RS规范提供(没有实现)。它不是RxJava的替代品。只提供最少量的内容以充当规范。 - M A

2
这取决于您所说的“在主线程上运行”的含义。
如果您想强制在特定线程上执行任意Flow,除非该Flow是在允许重写异步提供部分的库中实现的,否则没有标准方法可以做到这一点。在RxJava术语中,这些是由Schedulers实用程序类提供的Scheduler。
如果您想在主线程上观察Flow,则必须编写一个基于Flow.Subscriber的阻塞队列消费者,该消费者会阻塞线程直到队列有项目。这可能会变得复杂,因此我会引用Reactive4JavaFlow中的blockingSubscribe实现。
如果你想将Java主线程作为一个Executor/Scheduler使用,那会更加复杂,需要类似的阻塞机制以及一些来自线程池执行器的思路。Reactive4JavaFlow恰好有这样一个调度器,你可以通过以下方式将其用作执行器:new SubmissionPublisher<>(128, blockingScheduler::schedule)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接