我再次比较了RxJava和Java 9 Flow。我发现Flow默认是异步的,我想知道是否有办法使其同步运行。
有时我们只想使用它来进行语法糖而不是Nio,并且希望代码更加统一。
RxJava默认情况下是同步的,您可以在管道中使用observerOn
和subscribeOn
使其异步运行。
Flow中是否有任何操作符可以使其在主线程上运行?
敬礼。
Flow
中的文档定义自定义的Publisher
,以使用同步执行。
一个非常简单的发布者,当被请求时只向单个订阅者发布一个TRUE项目。由于订阅者仅接收单个项目,因此该类不使用缓冲和排序控制。
class OneShotPublisher implements Publisher<Boolean> {
private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
private boolean subscribed; // true after first subscribe
public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
if (subscribed)
subscriber.onError(new IllegalStateException()); // only one allowed
else {
subscribed = true;
subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
}
}
static class OneShotSubscription implements Subscription {
private final Subscriber<? super Boolean> subscriber;
private final ExecutorService executor;
private Future<?> future; // to allow cancellation
private boolean completed;
OneShotSubscription(Subscriber<? super Boolean> subscriber,
ExecutorService executor) {
this.subscriber = subscriber;
this.executor = executor;
}
public synchronized void request(long n) {
if (n != 0 && !completed) {
completed = true;
if (n < 0) {
IllegalArgumentException ex = new IllegalArgumentException();
executor.execute(() -> subscriber.onError(ex));
} else {
future = executor.submit(() -> {
subscriber.onNext(Boolean.TRUE);
subscriber.onComplete();
});
}
}
}
public synchronized void cancel() {
completed = true;
if (future != null) future.cancel(false);
}
}
}
没有直接的操作符可以实现此功能,但API允许您控制项目发布的方式。因此,您可以直接从当前线程调用订阅者方法。
class SynchronousPublisher implements Publisher<Data> {
public synchronized void subscribe(Subscriber<? super Data> subscriber) {
subscriber.onSubscribe(new SynchronousSubscription(subscriber));
}
}
static class SynchronousSubscription implements Subscription {
private final Subscriber<? super Data> subscriber;
SynchronousSubscription(Subscriber<? super Data> subscriber) {
this.subscriber = subscriber;
}
public synchronized void request(long n) {
... // prepare item
subscriber.onNext(someItem);
}
...
}
}
Executor
/Scheduler
使用,那会更加复杂,需要类似的阻塞机制以及一些来自线程池执行器的思路。Reactive4JavaFlow恰好有这样一个调度器,你可以通过以下方式将其用作执行器:new SubmissionPublisher<>(128, blockingScheduler::schedule)
。